Browse Source

Add ability to get snapshot status for running snapshots

Closes #4946
Igor Motov 11 years ago
parent
commit
a1192044f2
30 changed files with 2423 additions and 34 deletions
  1. 35 1
      docs/reference/modules/snapshots.asciidoc
  2. 27 0
      rest-api-spec/api/snapshot.status.json
  3. 3 0
      src/main/java/org/elasticsearch/action/ActionModule.java
  4. 98 0
      src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStage.java
  5. 157 0
      src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java
  6. 108 0
      src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexStatus.java
  7. 132 0
      src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotShardsStats.java
  8. 181 0
      src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java
  9. 198 0
      src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java
  10. 47 0
      src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusAction.java
  11. 129 0
      src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java
  12. 89 0
      src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequestBuilder.java
  13. 89 0
      src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java
  14. 308 0
      src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java
  15. 222 0
      src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java
  16. 18 0
      src/main/java/org/elasticsearch/client/ClusterAdminClient.java
  17. 13 1
      src/main/java/org/elasticsearch/client/Requests.java
  18. 20 0
      src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java
  19. 2 0
      src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java
  20. 2 0
      src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java
  21. 9 0
      src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java
  22. 61 0
      src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java
  23. 60 19
      src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java
  24. 101 12
      src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java
  25. 2 0
      src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java
  26. 2 0
      src/main/java/org/elasticsearch/repositories/RepositoriesModule.java
  27. 2 0
      src/main/java/org/elasticsearch/rest/action/RestActionModule.java
  28. 77 0
      src/main/java/org/elasticsearch/rest/action/admin/cluster/snapshots/status/RestSnapshotsStatusAction.java
  29. 123 0
      src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
  30. 108 1
      src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java

+ 35 - 1
docs/reference/modules/snapshots.asciidoc

@@ -175,7 +175,7 @@ started by mistake.
 [float]
 === Restore
 
-A snapshot can be restored using this following command:
+A snapshot can be restored using the following command:
 
 [source,shell]
 -----------------------------------
@@ -205,3 +205,37 @@ closed. The restore operation automatically opens restored indices if they were
 didn't exist in the cluster. If cluster state is restored, the restored templates that don't currently exist in the
 cluster are added and existing templates with the same name are replaced by the restored templates. The restored
 persistent settings are added to the existing persistent settings.
+
+
+[float]
+=== Snapshot status
+
+A list of currently running snapshots with their detailed status information can be obtained using the following command:
+
+[source,shell]
+-----------------------------------
+$ curl -XGET "localhost:9200/_snapshot/_status"
+-----------------------------------
+
+In this format, the command will return information about all currently running snapshots. By specifying a repository name, it's possible
+to limit the results to a particular repository:
+
+[source,shell]
+-----------------------------------
+$ curl -XGET "localhost:9200/_snapshot/my_backup/_status"
+-----------------------------------
+
+If both repository name and snapshot id are specified, this command will return detailed status information for the given snapshot even
+if it's not currently running:
+
+[source,shell]
+-----------------------------------
+$ curl -XGET "localhost:9200/_snapshot/my_backup/snapshot_1/_status"
+-----------------------------------
+
+Multiple ids are also supported:
+
+[source,shell]
+-----------------------------------
+$ curl -XGET "localhost:9200/_snapshot/my_backup/snapshot_1,snapshot_2/_status"
+-----------------------------------

+ 27 - 0
rest-api-spec/api/snapshot.status.json

@@ -0,0 +1,27 @@
+{
+  "snapshot.status": {
+    "documentation": "http://www.elasticsearch.org/guide/en/elasticsearch/reference/master/modules-snapshots.html",
+    "methods": ["GET"],
+    "url": {
+      "path": "/_snapshot/_status",
+      "paths": ["/_snapshot/_status", "/_snapshot/{repository}/_status", "/_snapshot/{repository}/{snapshot}/_status"],
+      "parts": {
+        "repository": {
+          "type": "string",
+          "description": "A repository name"
+        },
+        "snapshot": {
+          "type": "list",
+          "description": "A comma-separated list of snapshot names"
+        }
+      },
+      "params": {
+        "master_timeout": {
+          "type" : "time",
+          "description" : "Explicit operation timeout for connection to master node"
+        }
+      }
+    },
+    "body": null
+  }
+}

+ 3 - 0
src/main/java/org/elasticsearch/action/ActionModule.java

@@ -52,6 +52,8 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsAction;
 import org.elasticsearch.action.admin.cluster.snapshots.get.TransportGetSnapshotsAction;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusAction;
+import org.elasticsearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
 import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
 import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
@@ -214,6 +216,7 @@ public class ActionModule extends AbstractModule {
         registerAction(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
         registerAction(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
         registerAction(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
+        registerAction(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
 
         registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
         registerAction(IndicesStatusAction.INSTANCE, TransportIndicesStatusAction.class);

+ 98 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStage.java

@@ -0,0 +1,98 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.status;
+
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+
+/**
+ */
+public enum SnapshotIndexShardStage {
+
+    /**
+     * Snapshot hasn't started yet
+     */
+    INIT((byte)0, false),
+    /**
+     * Index files are being copied
+     */
+    STARTED((byte)1, false),
+    /**
+     * Snapshot metadata is being written
+     */
+    FINALIZE((byte)2, false),
+    /**
+     * Snapshot completed successfully
+     */
+    DONE((byte)3, true),
+    /**
+     * Snapshot failed
+     */
+    FAILURE((byte)4, true);
+
+    private byte value;
+
+    private boolean completed;
+
+    private SnapshotIndexShardStage(byte value, boolean completed) {
+        this.value = value;
+        this.completed = completed;
+    }
+
+    /**
+     * Returns code that represents the snapshot state
+     *
+     * @return code for the state
+     */
+    public byte value() {
+        return value;
+    }
+
+    /**
+     * Returns true if snapshot completed (successfully or not)
+     *
+     * @return true if snapshot completed, false otherwise
+     */
+    public boolean completed() {
+        return completed;
+    }
+
+    /**
+     * Generate snapshot state from code
+     *
+     * @param value the state code
+     * @return state
+     */
+    public static SnapshotIndexShardStage fromValue(byte value) {
+        switch (value) {
+            case 0:
+                return INIT;
+            case 1:
+                return STARTED;
+            case 2:
+                return FINALIZE;
+            case 3:
+                return DONE;
+            case 4:
+                return FAILURE;
+            default:
+                throw new ElasticsearchIllegalArgumentException("No snapshot shard stage for value [" + value + "]");
+        }
+    }
+}

+ 157 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java

@@ -0,0 +1,157 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.status;
+
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
+
+import java.io.IOException;
+
+/**
+ */
+public class SnapshotIndexShardStatus extends BroadcastShardOperationResponse implements ToXContent {
+
+    private SnapshotIndexShardStage stage = SnapshotIndexShardStage.INIT;
+
+    private SnapshotStats stats;
+
+    private String nodeId;
+
+    private String failure;
+
+    private SnapshotIndexShardStatus() {
+    }
+
+    SnapshotIndexShardStatus(String index, int shardId, SnapshotIndexShardStage stage) {
+        super(index, shardId);
+        this.stage = stage;
+    }
+
+    SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus indexShardStatus) {
+        this(shardId, indexShardStatus, null);
+    }
+
+    SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus indexShardStatus, String nodeId) {
+        super(shardId.getIndex(), shardId.getId());
+        switch (indexShardStatus.stage()) {
+            case INIT:
+                stage = SnapshotIndexShardStage.INIT;
+                break;
+            case STARTED:
+                stage = SnapshotIndexShardStage.STARTED;
+                break;
+            case FINALIZE:
+                stage = SnapshotIndexShardStage.FINALIZE;
+                break;
+            case DONE:
+                stage = SnapshotIndexShardStage.DONE;
+                break;
+            case FAILURE:
+                stage = SnapshotIndexShardStage.FAILURE;
+                break;
+            default:
+                throw new ElasticsearchIllegalArgumentException("Unknown stage type " + indexShardStatus.stage());
+        }
+        stats = new SnapshotStats(indexShardStatus);
+        failure = indexShardStatus.failure();
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * Returns snapshot stage
+     */
+    public SnapshotIndexShardStage getStage() {
+        return stage;
+    }
+
+    /**
+     * Returns snapshot stats
+     */
+    public SnapshotStats getStats() {
+        return stats;
+    }
+
+    /**
+     * Returns node id of the node where snapshot is currently running
+     */
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    /**
+     * Returns reason for snapshot failure
+     */
+    public String getFailure() {
+        return failure;
+    }
+
+
+    public static SnapshotIndexShardStatus readShardSnapshotStatus(StreamInput in) throws IOException {
+        SnapshotIndexShardStatus shardStatus = new SnapshotIndexShardStatus();
+        shardStatus.readFrom(in);
+        return shardStatus;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeByte(stage.value());
+        stats.writeTo(out);
+        out.writeOptionalString(nodeId);
+        out.writeOptionalString(failure);
+    }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        super.readFrom(in);
+        stage = SnapshotIndexShardStage.fromValue(in.readByte());
+        stats = SnapshotStats.readSnapshotStats(in);
+        nodeId = in.readOptionalString();
+        failure = in.readOptionalString();
+    }
+
+    static final class Fields {
+        static final XContentBuilderString STAGE = new XContentBuilderString("stage");
+        static final XContentBuilderString REASON = new XContentBuilderString("reason");
+        static final XContentBuilderString NODE = new XContentBuilderString("node");
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject(Integer.toString(getShardId()));
+        builder.field(Fields.STAGE, getStage());
+        stats.toXContent(builder, params);
+        if (getNodeId() != null) {
+            builder.field(Fields.NODE, getNodeId());
+        }
+        if (getFailure() != null) {
+            builder.field(Fields.REASON, getFailure());
+        }
+        builder.endObject();
+        return builder;
+    }
+}

+ 108 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexStatus.java

@@ -0,0 +1,108 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.status;
+
+import com.google.common.collect.ImmutableMap;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Represents snapshot status of all shards in the index
+ */
+public class SnapshotIndexStatus implements Iterable<SnapshotIndexShardStatus>, ToXContent {
+
+    private final String index;
+
+    private final Map<Integer, SnapshotIndexShardStatus> indexShards;
+
+    private final SnapshotShardsStats shardsStats;
+
+    private final SnapshotStats stats;
+
+    SnapshotIndexStatus(String index, Collection<SnapshotIndexShardStatus> shards) {
+        this.index = index;
+
+        ImmutableMap.Builder<Integer, SnapshotIndexShardStatus> builder = ImmutableMap.builder();
+        stats = new SnapshotStats();
+        for (SnapshotIndexShardStatus shard : shards) {
+            builder.put(shard.getShardId(), shard);
+            stats.add(shard.getStats());
+        }
+        shardsStats = new SnapshotShardsStats(shards);
+        indexShards = builder.build();
+    }
+
+    /**
+     * Returns the index name
+     */
+    public String getIndex() {
+        return this.index;
+    }
+
+    /**
+     * A shard id to index snapshot shard status map
+     */
+    public Map<Integer, SnapshotIndexShardStatus> getShards() {
+        return this.indexShards;
+    }
+
+    /**
+     * Shards stats
+     */
+    public SnapshotShardsStats getShardsStats() {
+        return shardsStats;
+    }
+
+    /**
+     * Returns snapshot stats
+     */
+    public SnapshotStats getStats() {
+        return stats;
+    }
+
+    @Override
+    public Iterator<SnapshotIndexShardStatus> iterator() {
+        return indexShards.values().iterator();
+    }
+
+    static final class Fields {
+        static final XContentBuilderString SHARDS = new XContentBuilderString("shards");
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject(getIndex(), XContentBuilder.FieldCaseConversion.NONE);
+        shardsStats.toXContent(builder, params);
+        stats.toXContent(builder, params);
+        builder.startObject(Fields.SHARDS);
+        for (SnapshotIndexShardStatus shard : indexShards.values()) {
+            shard.toXContent(builder, params);
+        }
+        builder.endObject();
+        builder.endObject();
+        return builder;
+    }
+}

+ 132 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotShardsStats.java

@@ -0,0 +1,132 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.status;
+
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Status of a snapshot shards
+ */
+public class SnapshotShardsStats  implements ToXContent {
+
+    private int initializingShards;
+    private int startedShards;
+    private int finalizingShards;
+    private int doneShards;
+    private int failedShards;
+    private int totalShards;
+
+    SnapshotShardsStats(Collection<SnapshotIndexShardStatus> shards) {
+        for (SnapshotIndexShardStatus shard : shards) {
+            totalShards++;
+            switch (shard.getStage()) {
+                case INIT:
+                    initializingShards++;
+                    break;
+                case STARTED:
+                    startedShards++;
+                    break;
+                case FINALIZE:
+                    finalizingShards++;
+                    break;
+                case DONE:
+                    doneShards++;
+                    break;
+                case FAILURE:
+                    failedShards++;
+                    break;
+                default:
+                    throw new ElasticsearchIllegalArgumentException("Unknown stage type " + shard.getStage());
+            }
+        }
+    }
+
+    /**
+     * Number of shards with the snapshot in the initializing stage
+     */
+    public int getInitializingShards() {
+        return initializingShards;
+    }
+
+    /**
+     * Number of shards with the snapshot in the started stage
+     */
+    public int getStartedShards() {
+        return startedShards;
+    }
+
+    /**
+     * Number of shards with the snapshot in the finalizing stage
+     */
+    public int getFinalizingShards() {
+        return finalizingShards;
+    }
+
+    /**
+     * Number of shards with completed snapshot
+     */
+    public int getDoneShards() {
+        return doneShards;
+    }
+
+    /**
+     * Number of shards with failed snapshot
+     */
+    public int getFailedShards() {
+        return failedShards;
+    }
+
+    /**
+     * Total number of shards
+     */
+    public int getTotalShards() {
+        return totalShards;
+    }
+
+    static final class Fields {
+        static final XContentBuilderString SHARDS_STATS = new XContentBuilderString("shards_stats");
+        static final XContentBuilderString INITIALIZING = new XContentBuilderString("initializing");
+        static final XContentBuilderString STARTED = new XContentBuilderString("started");
+        static final XContentBuilderString FINALIZING = new XContentBuilderString("finalizing");
+        static final XContentBuilderString DONE = new XContentBuilderString("done");
+        static final XContentBuilderString FAILED = new XContentBuilderString("failed");
+        static final XContentBuilderString TOTAL = new XContentBuilderString("total");
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+        builder.startObject(Fields.SHARDS_STATS);
+        builder.field(Fields.INITIALIZING, getInitializingShards());
+        builder.field(Fields.STARTED, getStartedShards());
+        builder.field(Fields.FINALIZING, getFinalizingShards());
+        builder.field(Fields.DONE, getDoneShards());
+        builder.field(Fields.FAILED, getFailedShards());
+        builder.field(Fields.TOTAL, getTotalShards());
+        builder.endObject();
+        return builder;
+    }
+
+}

+ 181 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java

@@ -0,0 +1,181 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.status;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
+
+import java.io.IOException;
+
+/**
+ */
+public class SnapshotStats implements Streamable, ToXContent {
+    private long startTime;
+
+    private long time;
+
+    private int numberOfFiles;
+
+    private int processedFiles;
+
+    private long totalSize;
+
+    private long processedSize;
+
+    SnapshotStats() {
+    }
+
+    SnapshotStats(IndexShardSnapshotStatus indexShardStatus) {
+        startTime = indexShardStatus.startTime();
+        time = indexShardStatus.time();
+        numberOfFiles = indexShardStatus.numberOfFiles();
+        processedFiles = indexShardStatus.processedFiles();
+        totalSize = indexShardStatus.totalSize();
+        processedSize = indexShardStatus.processedSize();
+    }
+
+    /**
+     * Returns time when snapshot started
+     */
+    public long getStartTime() {
+        return startTime;
+    }
+
+    /**
+     * Returns snapshot running time
+     */
+    public long getTime() {
+        return time;
+    }
+
+    /**
+     * Returns number of files in the snapshot
+     */
+    public int getNumberOfFiles() {
+        return numberOfFiles;
+    }
+
+    /**
+     * Returns number of files in the snapshot that were processed so far
+     */
+    public int getProcessedFiles() {
+        return processedFiles;
+    }
+
+    /**
+     * Returns total size of files in the snapshot
+     */
+    public long getTotalSize() {
+        return totalSize;
+    }
+
+    /**
+     * Returns total size of files in the snapshot that were processed so far
+     */
+    public long getProcessedSize() {
+        return processedSize;
+    }
+
+
+    public static SnapshotStats readSnapshotStats(StreamInput in) throws IOException {
+        SnapshotStats stats = new SnapshotStats();
+        stats.readFrom(in);
+        return stats;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeVLong(startTime);
+        out.writeVLong(time);
+
+        out.writeVInt(numberOfFiles);
+        out.writeVInt(processedFiles);
+
+        out.writeVLong(totalSize);
+        out.writeVLong(processedSize);
+    }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        startTime = in.readVLong();
+        time = in.readVLong();
+
+        numberOfFiles = in.readVInt();
+        processedFiles = in.readVInt();
+
+        totalSize = in.readVLong();
+        processedSize = in.readVLong();
+    }
+
+    static final class Fields {
+        static final XContentBuilderString STATS = new XContentBuilderString("stats");
+        static final XContentBuilderString NUMBER_OF_FILES = new XContentBuilderString("number_of_files");
+        static final XContentBuilderString PROCESSED_FILES = new XContentBuilderString("processed_files");
+        static final XContentBuilderString TOTAL_SIZE_IN_BYTES = new XContentBuilderString("total_size_in_bytes");
+        static final XContentBuilderString TOTAL_SIZE = new XContentBuilderString("total_size");
+        static final XContentBuilderString PROCESSED_SIZE_IN_BYTES = new XContentBuilderString("processed_size_in_bytes");
+        static final XContentBuilderString PROCESSED_SIZE = new XContentBuilderString("processed_size");
+        static final XContentBuilderString START_TIME_IN_MILLIS = new XContentBuilderString("start_time_in_millis");
+        static final XContentBuilderString TIME_IN_MILLIS = new XContentBuilderString("time_in_millis");
+        static final XContentBuilderString TIME = new XContentBuilderString("time");
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+        builder.startObject(Fields.STATS);
+        builder.field(Fields.NUMBER_OF_FILES, getNumberOfFiles());
+        builder.field(Fields.PROCESSED_FILES, getProcessedFiles());
+        builder.byteSizeField(Fields.TOTAL_SIZE_IN_BYTES, Fields.TOTAL_SIZE, getTotalSize());
+        builder.byteSizeField(Fields.PROCESSED_SIZE_IN_BYTES, Fields.PROCESSED_SIZE, getProcessedSize());
+        builder.field(Fields.START_TIME_IN_MILLIS, getStartTime());
+        builder.timeValueField(Fields.TIME_IN_MILLIS, Fields.TIME, getTime());
+        builder.endObject();
+        return builder;
+    }
+
+    void add(SnapshotStats stats) {
+        numberOfFiles += stats.numberOfFiles;
+        processedFiles += stats.processedFiles;
+
+        totalSize += stats.totalSize;
+        processedSize += stats.processedSize;
+
+
+        if (startTime == 0) {
+            // First time here
+            startTime = stats.startTime;
+            time = stats.time;
+        } else {
+            // The time the last snapshot ends
+            long endTime = Math.max(startTime + time, stats.startTime + stats.time);
+
+            // The time the first snapshot starts
+            startTime = Math.min(startTime, stats.startTime);
+
+            // Update duration
+            time = endTime - startTime;
+        }
+    }
+}

+ 198 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

@@ -0,0 +1,198 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.status;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.elasticsearch.cluster.metadata.SnapshotId;
+import org.elasticsearch.cluster.metadata.SnapshotMetaData.State;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+
+/**
+ * Status of a snapshot
+ */
+public class SnapshotStatus implements ToXContent, Streamable {
+
+    private SnapshotId snapshotId;
+
+    private State state;
+
+    private ImmutableList<SnapshotIndexShardStatus> shards;
+
+    private ImmutableMap<String, SnapshotIndexStatus> indicesStatus;
+
+    private SnapshotShardsStats shardsStats;
+
+    private SnapshotStats stats;
+
+
+    SnapshotStatus(SnapshotId snapshotId, State state, ImmutableList<SnapshotIndexShardStatus> shards) {
+        this.snapshotId = snapshotId;
+        this.state = state;
+        this.shards = shards;
+        shardsStats = new SnapshotShardsStats(shards);
+        updateShardStats();
+    }
+
+    SnapshotStatus() {
+    }
+
+    /**
+     * Returns snapshot id
+     */
+    public SnapshotId getSnapshotId() {
+        return snapshotId;
+    }
+
+    /**
+     * Returns snapshot state
+     */
+    public State getState() {
+        return state;
+    }
+
+    /**
+     * Returns list of snapshot shards
+     */
+    public List<SnapshotIndexShardStatus> getShards() {
+        return shards;
+    }
+
+    public SnapshotShardsStats getShardsStats() {
+        return shardsStats;
+    }
+
+    /**
+     * Returns list of snapshot indices
+     */
+    public Map<String, SnapshotIndexStatus> getIndices() {
+        if (this.indicesStatus != null) {
+            return this.indicesStatus;
+        }
+
+        ImmutableMap.Builder<String, SnapshotIndexStatus> indicesStatus = ImmutableMap.builder();
+
+        Set<String> indices = newHashSet();
+        for (SnapshotIndexShardStatus shard : shards) {
+            indices.add(shard.getIndex());
+        }
+
+        for (String index : indices) {
+            List<SnapshotIndexShardStatus> shards = newArrayList();
+            for (SnapshotIndexShardStatus shard : this.shards) {
+                if (shard.getIndex().equals(index)) {
+                    shards.add(shard);
+                }
+            }
+            indicesStatus.put(index, new SnapshotIndexStatus(index, shards));
+        }
+        this.indicesStatus = indicesStatus.build();
+        return this.indicesStatus;
+
+    }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        snapshotId = SnapshotId.readSnapshotId(in);
+        state = State.fromValue(in.readByte());
+        int size = in.readVInt();
+        ImmutableList.Builder<SnapshotIndexShardStatus> builder = ImmutableList.builder();
+        for (int i = 0; i < size; i++) {
+            builder.add(SnapshotIndexShardStatus.readShardSnapshotStatus(in));
+        }
+        shards = builder.build();
+        updateShardStats();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        snapshotId.writeTo(out);
+        out.writeByte(state.value());
+        out.writeVInt(shards.size());
+        for (SnapshotIndexShardStatus shard : shards) {
+            shard.writeTo(out);
+        }
+    }
+
+    /**
+     * Reads snapshot status from stream input
+     *
+     * @param in stream input
+     * @return deserialized snapshot status
+     * @throws IOException
+     */
+    public static SnapshotStatus readSnapshotStatus(StreamInput in) throws IOException {
+        SnapshotStatus snapshotInfo = new SnapshotStatus();
+        snapshotInfo.readFrom(in);
+        return snapshotInfo;
+    }
+
+    /**
+     * Returns number of files in the snapshot
+     */
+    public SnapshotStats getStats() {
+        return stats;
+    }
+
+    static final class Fields {
+        static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot");
+        static final XContentBuilderString REPOSITORY = new XContentBuilderString("repository");
+        static final XContentBuilderString STATE = new XContentBuilderString("state");
+        static final XContentBuilderString INDICES = new XContentBuilderString("indices");
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(Fields.SNAPSHOT, snapshotId.getSnapshot());
+        builder.field(Fields.REPOSITORY, snapshotId.getRepository());
+        builder.field(Fields.STATE, state.name());
+        shardsStats.toXContent(builder, params);
+        stats.toXContent(builder, params);
+        builder.startObject(Fields.INDICES);
+        for (SnapshotIndexStatus indexStatus : getIndices().values()) {
+            indexStatus.toXContent(builder, params);
+        }
+        builder.endObject();
+        builder.endObject();
+        return builder;
+    }
+
+    private void updateShardStats() {
+        stats = new SnapshotStats();
+        shardsStats = new SnapshotShardsStats(shards);
+        for (SnapshotIndexShardStatus shard : shards) {
+            stats.add(shard.getStats());
+        }
+    }
+}

+ 47 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusAction.java

@@ -0,0 +1,47 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.status;
+
+import org.elasticsearch.action.admin.cluster.ClusterAction;
+import org.elasticsearch.client.ClusterAdminClient;
+
+/**
+ * Snapshots status action
+ */
+public class SnapshotsStatusAction extends ClusterAction<SnapshotsStatusRequest, SnapshotsStatusResponse, SnapshotsStatusRequestBuilder> {
+
+    public static final SnapshotsStatusAction INSTANCE = new SnapshotsStatusAction();
+    public static final String NAME = "cluster/snapshot/status";
+
+    private SnapshotsStatusAction() {
+        super(NAME);
+    }
+
+    @Override
+    public SnapshotsStatusResponse newResponse() {
+        return new SnapshotsStatusResponse();
+    }
+
+    @Override
+    public SnapshotsStatusRequestBuilder newRequestBuilder(ClusterAdminClient client) {
+        return new SnapshotsStatusRequestBuilder(client);
+    }
+}
+

+ 129 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java

@@ -0,0 +1,129 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.status;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+
+import static org.elasticsearch.action.ValidateActions.addValidationError;
+
+/**
+ * Get snapshot status request
+ */
+public class SnapshotsStatusRequest extends MasterNodeOperationRequest<SnapshotsStatusRequest> {
+
+    private String repository;
+
+    private String[] snapshots = Strings.EMPTY_ARRAY;
+
+    SnapshotsStatusRequest() {
+    }
+
+    /**
+     * Constructs a new get snapshots request with given repository name and list of snapshots
+     *
+     * @param repository repository name
+     * @param snapshots  list of snapshots
+     */
+    public SnapshotsStatusRequest(String repository, String[] snapshots) {
+        this.repository = repository;
+        this.snapshots = snapshots;
+    }
+
+    /**
+     * Constructs a new get snapshots request with given repository name
+     *
+     * @param repository repository name
+     */
+    public SnapshotsStatusRequest(String repository) {
+        this.repository = repository;
+    }
+
+    @Override
+    public ActionRequestValidationException validate() {
+        ActionRequestValidationException validationException = null;
+        if (repository == null) {
+            validationException = addValidationError("repository is missing", validationException);
+        }
+        if (snapshots == null) {
+            validationException = addValidationError("snapshots is null", validationException);
+        }
+        return validationException;
+    }
+
+    /**
+     * Sets repository name
+     *
+     * @param repository repository name
+     * @return this request
+     */
+    public SnapshotsStatusRequest repository(String repository) {
+        this.repository = repository;
+        return this;
+    }
+
+    /**
+     * Returns repository name
+     *
+     * @return repository name
+     */
+    public String repository() {
+        return this.repository;
+    }
+
+    /**
+     * Returns the names of the snapshots.
+     *
+     * @return the names of snapshots
+     */
+    public String[] snapshots() {
+        return this.snapshots;
+    }
+
+    /**
+     * Sets the list of snapshots to be returned
+     *
+     * @param snapshots
+     * @return this request
+     */
+    public SnapshotsStatusRequest snapshots(String[] snapshots) {
+        this.snapshots = snapshots;
+        return this;
+    }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        super.readFrom(in);
+        repository = in.readString();
+        snapshots = in.readStringArray();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeString(repository);
+        out.writeStringArray(snapshots);
+    }
+}

+ 89 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequestBuilder.java

@@ -0,0 +1,89 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.status;
+
+import com.google.common.collect.ObjectArrays;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
+import org.elasticsearch.client.ClusterAdminClient;
+import org.elasticsearch.client.internal.InternalClusterAdminClient;
+
+/**
+ * Snapshots status request builder
+ */
+public class SnapshotsStatusRequestBuilder extends MasterNodeOperationRequestBuilder<SnapshotsStatusRequest, SnapshotsStatusResponse, SnapshotsStatusRequestBuilder> {
+
+    /**
+     * Constructs the new snapshotstatus request
+     *
+     * @param clusterAdminClient cluster admin client
+     */
+    public SnapshotsStatusRequestBuilder(ClusterAdminClient clusterAdminClient) {
+        super((InternalClusterAdminClient) clusterAdminClient, new SnapshotsStatusRequest());
+    }
+
+    /**
+     * Constructs the new snapshot status request with specified repository
+     *
+     * @param clusterAdminClient cluster admin client
+     * @param repository         repository name
+     */
+    public SnapshotsStatusRequestBuilder(ClusterAdminClient clusterAdminClient, String repository) {
+        super((InternalClusterAdminClient) clusterAdminClient, new SnapshotsStatusRequest(repository));
+    }
+
+    /**
+     * Sets the repository name
+     *
+     * @param repository repository name
+     * @return this builder
+     */
+    public SnapshotsStatusRequestBuilder setRepository(String repository) {
+        request.repository(repository);
+        return this;
+    }
+
+    /**
+     * Sets list of snapshots to return
+     *
+     * @param snapshots list of snapshots
+     * @return this builder
+     */
+    public SnapshotsStatusRequestBuilder setSnapshots(String... snapshots) {
+        request.snapshots(snapshots);
+        return this;
+    }
+
+    /**
+     * Adds additional snapshots to the list of snapshots to return
+     *
+     * @param snapshots additional snapshots
+     * @return this builder
+     */
+    public SnapshotsStatusRequestBuilder addSnapshots(String... snapshots) {
+        request.snapshots(ObjectArrays.concat(request.snapshots(), snapshots, String.class));
+        return this;
+    }
+
+    @Override
+    protected void doExecute(ActionListener<SnapshotsStatusResponse> listener) {
+        ((ClusterAdminClient) client).snapshotsStatus(request, listener);
+    }
+}

+ 89 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java

@@ -0,0 +1,89 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.status;
+
+import com.google.common.collect.ImmutableList;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+
+import java.io.IOException;
+
+/**
+ * Snapshot status response
+ */
+public class SnapshotsStatusResponse extends ActionResponse implements ToXContent {
+
+    private ImmutableList<SnapshotStatus> snapshots = ImmutableList.of();
+
+    SnapshotsStatusResponse() {
+    }
+
+    SnapshotsStatusResponse(ImmutableList<SnapshotStatus> snapshots) {
+        this.snapshots = snapshots;
+    }
+
+    /**
+     * Returns the list of snapshots
+     *
+     * @return the list of snapshots
+     */
+    public ImmutableList<SnapshotStatus> getSnapshots() {
+        return snapshots;
+    }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        super.readFrom(in);
+        int size = in.readVInt();
+        ImmutableList.Builder<SnapshotStatus> builder = ImmutableList.builder();
+        for (int i = 0; i < size; i++) {
+            builder.add(SnapshotStatus.readSnapshotStatus(in));
+        }
+        snapshots = builder.build();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeVInt(snapshots.size());
+        for (SnapshotStatus snapshotInfo : snapshots) {
+            snapshotInfo.writeTo(out);
+        }
+    }
+
+    static final class Fields {
+        static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots");
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startArray(Fields.SNAPSHOTS);
+        for (SnapshotStatus snapshot : snapshots) {
+            snapshot.toXContent(builder, params);
+        }
+        builder.endArray();
+        return builder;
+    }
+
+}

+ 308 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java

@@ -0,0 +1,308 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.status;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.nodes.*;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.cluster.metadata.SnapshotId;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
+import org.elasticsearch.snapshots.SnapshotsService;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+/**
+ * Transport client that collects snapshot shard statuses from data nodes
+ */
+public class TransportNodesSnapshotsStatus extends TransportNodesOperationAction<TransportNodesSnapshotsStatus.Request, TransportNodesSnapshotsStatus.NodesSnapshotStatus, TransportNodesSnapshotsStatus.NodeRequest, TransportNodesSnapshotsStatus.NodeSnapshotStatus> {
+
+    private final SnapshotsService snapshotsService;
+
+    @Inject
+    public TransportNodesSnapshotsStatus(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, SnapshotsService snapshotsService) {
+        super(settings, clusterName, threadPool, clusterService, transportService);
+        this.snapshotsService = snapshotsService;
+    }
+
+    public void status(String[] nodesIds, SnapshotId[] snapshotIds, @Nullable TimeValue timeout, ActionListener<NodesSnapshotStatus> listener) {
+        execute(new Request(nodesIds).snapshotIds(snapshotIds).timeout(timeout), listener);
+    }
+
+    @Override
+    protected String executor() {
+        return ThreadPool.Names.GENERIC;
+    }
+
+    @Override
+    protected String transportAction() {
+        return "cluster/snapshot/status/nodes";
+    }
+
+    @Override
+    protected boolean transportCompress() {
+        return true; // compress since the metadata can become large
+    }
+
+    @Override
+    protected Request newRequest() {
+        return new Request();
+    }
+
+    @Override
+    protected NodeRequest newNodeRequest() {
+        return new NodeRequest();
+    }
+
+    @Override
+    protected NodeRequest newNodeRequest(String nodeId, Request request) {
+        return new NodeRequest(nodeId, request);
+    }
+
+    @Override
+    protected NodeSnapshotStatus newNodeResponse() {
+        return new NodeSnapshotStatus();
+    }
+
+    @Override
+    protected NodesSnapshotStatus newResponse(Request request, AtomicReferenceArray responses) {
+        final List<NodeSnapshotStatus> nodesList = Lists.newArrayList();
+        final List<FailedNodeException> failures = Lists.newArrayList();
+        for (int i = 0; i < responses.length(); i++) {
+            Object resp = responses.get(i);
+            if (resp instanceof NodeSnapshotStatus) { // will also filter out null response for unallocated ones
+                nodesList.add((NodeSnapshotStatus) resp);
+            } else if (resp instanceof FailedNodeException) {
+                failures.add((FailedNodeException) resp);
+            } else {
+                logger.warn("unknown response type [{}], expected NodeSnapshotStatus or FailedNodeException", resp);
+            }
+        }
+        return new NodesSnapshotStatus(clusterName, nodesList.toArray(new NodeSnapshotStatus[nodesList.size()]),
+                failures.toArray(new FailedNodeException[failures.size()]));
+    }
+
+    @Override
+    protected NodeSnapshotStatus nodeOperation(NodeRequest request) throws ElasticsearchException {
+        ImmutableMap.Builder<SnapshotId, ImmutableMap<ShardId, SnapshotIndexShardStatus>> snapshotMapBuilder = ImmutableMap.builder();
+        try {
+            String nodeId = clusterService.localNode().id();
+            for (SnapshotId snapshotId : request.snapshotIds) {
+                ImmutableMap<ShardId, IndexShardSnapshotStatus> shardsStatus = snapshotsService.currentSnapshotShards(snapshotId);
+                if (shardsStatus == null) {
+                    continue;
+                }
+                ImmutableMap.Builder<ShardId, SnapshotIndexShardStatus> shardMapBuilder = ImmutableMap.builder();
+                for (ImmutableMap.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : shardsStatus.entrySet()) {
+                    SnapshotIndexShardStatus shardStatus;
+                    IndexShardSnapshotStatus.Stage stage = shardEntry.getValue().stage();
+                    if (stage != IndexShardSnapshotStatus.Stage.DONE && stage != IndexShardSnapshotStatus.Stage.FAILURE) {
+                        // Store node id for the snapshots that are currently running.
+                        shardStatus = new SnapshotIndexShardStatus(shardEntry.getKey(), shardEntry.getValue(), nodeId);
+                    } else {
+                        shardStatus = new SnapshotIndexShardStatus(shardEntry.getKey(), shardEntry.getValue());
+                    }
+                    shardMapBuilder.put(shardEntry.getKey(), shardStatus);
+                }
+                snapshotMapBuilder.put(snapshotId, shardMapBuilder.build());
+            }
+            return new NodeSnapshotStatus(clusterService.localNode(), snapshotMapBuilder.build());
+        } catch (Exception e) {
+            throw new ElasticsearchException("failed to load metadata", e);
+        }
+    }
+
+    @Override
+    protected boolean accumulateExceptions() {
+        return true;
+    }
+
+    static class Request extends NodesOperationRequest<Request> {
+
+        private SnapshotId[] snapshotIds;
+
+        public Request() {
+        }
+
+        public Request(String[] nodesIds) {
+            super(nodesIds);
+        }
+
+        public Request snapshotIds(SnapshotId[] snapshotIds) {
+            this.snapshotIds = snapshotIds;
+            return this;
+        }
+
+        @Override
+        public void readFrom(StreamInput in) throws IOException {
+            // This operation is never executed remotely
+            throw new UnsupportedOperationException("shouldn't be here");
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            // This operation is never executed remotely
+            throw new UnsupportedOperationException("shouldn't be here");
+        }
+    }
+
+    public static class NodesSnapshotStatus extends NodesOperationResponse<NodeSnapshotStatus> {
+
+        private FailedNodeException[] failures;
+
+        NodesSnapshotStatus() {
+        }
+
+        public NodesSnapshotStatus(ClusterName clusterName, NodeSnapshotStatus[] nodes, FailedNodeException[] failures) {
+            super(clusterName, nodes);
+            this.failures = failures;
+        }
+
+        public FailedNodeException[] failures() {
+            return failures;
+        }
+
+        @Override
+        public void readFrom(StreamInput in) throws IOException {
+            super.readFrom(in);
+            nodes = new NodeSnapshotStatus[in.readVInt()];
+            for (int i = 0; i < nodes.length; i++) {
+                nodes[i] = new NodeSnapshotStatus();
+                nodes[i].readFrom(in);
+            }
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeVInt(nodes.length);
+            for (NodeSnapshotStatus response : nodes) {
+                response.writeTo(out);
+            }
+        }
+    }
+
+
+    static class NodeRequest extends NodeOperationRequest {
+
+        private SnapshotId[] snapshotIds;
+
+        NodeRequest() {
+        }
+
+        NodeRequest(String nodeId, TransportNodesSnapshotsStatus.Request request) {
+            super(request, nodeId);
+            snapshotIds = request.snapshotIds;
+        }
+
+        @Override
+        public void readFrom(StreamInput in) throws IOException {
+            super.readFrom(in);
+            int n = in.readVInt();
+            snapshotIds = new SnapshotId[n];
+            for (int i = 0; i < n; i++) {
+                snapshotIds[i] = SnapshotId.readSnapshotId(in);
+            }
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            if (snapshotIds != null) {
+                out.writeVInt(snapshotIds.length);
+                for (int i = 0; i < snapshotIds.length; i++) {
+                    snapshotIds[i].writeTo(out);
+                }
+            } else {
+                out.writeVInt(0);
+            }
+        }
+    }
+
+    public static class NodeSnapshotStatus extends NodeOperationResponse {
+
+        private ImmutableMap<SnapshotId, ImmutableMap<ShardId, SnapshotIndexShardStatus>> status;
+
+        NodeSnapshotStatus() {
+        }
+
+        public NodeSnapshotStatus(DiscoveryNode node, ImmutableMap<SnapshotId, ImmutableMap<ShardId, SnapshotIndexShardStatus>> status) {
+            super(node);
+            this.status = status;
+        }
+
+        public ImmutableMap<SnapshotId, ImmutableMap<ShardId, SnapshotIndexShardStatus>> status() {
+            return status;
+        }
+
+        @Override
+        public void readFrom(StreamInput in) throws IOException {
+            super.readFrom(in);
+            int numberOfSnapshots = in.readVInt();
+            ImmutableMap.Builder<SnapshotId, ImmutableMap<ShardId, SnapshotIndexShardStatus>> snapshotMapBuilder = ImmutableMap.builder();
+            for (int i = 0; i < numberOfSnapshots; i++) {
+                SnapshotId snapshotId = SnapshotId.readSnapshotId(in);
+                ImmutableMap.Builder<ShardId, SnapshotIndexShardStatus> shardMapBuilder = ImmutableMap.builder();
+                int numberOfShards = in.readVInt();
+                for (int j = 0; j < numberOfShards; j++) {
+                    ShardId shardId =  ShardId.readShardId(in);
+                    SnapshotIndexShardStatus status = SnapshotIndexShardStatus.readShardSnapshotStatus(in);
+                    shardMapBuilder.put(shardId, status);
+                }
+                snapshotMapBuilder.put(snapshotId, shardMapBuilder.build());
+            }
+            status = snapshotMapBuilder.build();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            if (status != null) {
+                out.writeVInt(status.size());
+                for (ImmutableMap.Entry<SnapshotId, ImmutableMap<ShardId, SnapshotIndexShardStatus>> entry : status.entrySet()) {
+                    entry.getKey().writeTo(out);
+                    out.writeVInt(entry.getValue().size());
+                    for (ImmutableMap.Entry<ShardId, SnapshotIndexShardStatus> shardEntry : entry.getValue().entrySet()) {
+                        shardEntry.getKey().writeTo(out);
+                        shardEntry.getValue().writeTo(out);
+                    }
+                }
+            } else {
+                out.writeVInt(0);
+            }
+        }
+    }
+}

+ 222 - 0
src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

@@ -0,0 +1,222 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.status;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
+import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.SnapshotId;
+import org.elasticsearch.cluster.metadata.SnapshotMetaData;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
+import org.elasticsearch.snapshots.Snapshot;
+import org.elasticsearch.snapshots.SnapshotsService;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.collect.Maps.newHashMap;
+import static com.google.common.collect.Sets.newHashSet;
+
+/**
+ */
+public class TransportSnapshotsStatusAction extends TransportMasterNodeOperationAction<SnapshotsStatusRequest, SnapshotsStatusResponse> {
+
+    private final SnapshotsService snapshotsService;
+
+    private final TransportNodesSnapshotsStatus transportNodesSnapshotsStatus;
+
+    @Inject
+    public TransportSnapshotsStatusAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, SnapshotsService snapshotsService, TransportNodesSnapshotsStatus transportNodesSnapshotsStatus) {
+        super(settings, transportService, clusterService, threadPool);
+        this.snapshotsService = snapshotsService;
+        this.transportNodesSnapshotsStatus = transportNodesSnapshotsStatus;
+    }
+
+    @Override
+    protected String executor() {
+        return ThreadPool.Names.GENERIC;
+    }
+
+    @Override
+    protected String transportAction() {
+        return SnapshotsStatusAction.NAME;
+    }
+
+    @Override
+    protected SnapshotsStatusRequest newRequest() {
+        return new SnapshotsStatusRequest();
+    }
+
+    @Override
+    protected SnapshotsStatusResponse newResponse() {
+        return new SnapshotsStatusResponse();
+    }
+
+    @Override
+    protected void masterOperation(final SnapshotsStatusRequest request,
+                                   final ClusterState state,
+                                   final ActionListener<SnapshotsStatusResponse> listener) throws ElasticsearchException {
+        ImmutableList<SnapshotMetaData.Entry> currentSnapshots = snapshotsService.currentSnapshots(request.repository(), request.snapshots());
+
+        if (currentSnapshots.isEmpty()) {
+            buildResponse(request, currentSnapshots, null);
+        }
+
+        Set<String> nodesIds = newHashSet();
+        for (SnapshotMetaData.Entry entry : currentSnapshots) {
+            for (SnapshotMetaData.ShardSnapshotStatus status : entry.shards().values()) {
+                if (status.nodeId() != null) {
+                    nodesIds.add(status.nodeId());
+                }
+            }
+        }
+
+        if (!nodesIds.isEmpty()) {
+            // There are still some snapshots running - check their progress
+            SnapshotId[] snapshotIds = new SnapshotId[currentSnapshots.size()];
+            for (int i = 0; i < currentSnapshots.size(); i++) {
+                snapshotIds[i] = currentSnapshots.get(i).snapshotId();
+            }
+
+            transportNodesSnapshotsStatus.status(nodesIds.toArray(new String[nodesIds.size()]),
+                    snapshotIds, request.masterNodeTimeout(), new ActionListener<TransportNodesSnapshotsStatus.NodesSnapshotStatus>() {
+                @Override
+                public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
+                    ImmutableList<SnapshotMetaData.Entry> currentSnapshots =
+                            snapshotsService.currentSnapshots(request.repository(), request.snapshots());
+                    listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
+                }
+
+                @Override
+                public void onFailure(Throwable e) {
+                    listener.onFailure(e);
+                }
+            });
+        } else {
+            // We don't have any in-progress shards, just return current stats
+            listener.onResponse(buildResponse(request, currentSnapshots, null));
+        }
+
+    }
+
+    private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, ImmutableList<SnapshotMetaData.Entry> currentSnapshots,
+                                                  TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
+        // First process snapshot that are currently processed
+        ImmutableList.Builder<SnapshotStatus> builder = ImmutableList.builder();
+        Set<SnapshotId> currentSnapshotIds = newHashSet();
+        if (!currentSnapshots.isEmpty()) {
+            Map<String, TransportNodesSnapshotsStatus.NodeSnapshotStatus> nodeSnapshotStatusMap;
+            if (nodeSnapshotStatuses != null) {
+                nodeSnapshotStatusMap = nodeSnapshotStatuses.getNodesMap();
+            } else {
+                nodeSnapshotStatusMap = newHashMap();
+            }
+
+            for (SnapshotMetaData.Entry entry : currentSnapshots) {
+                currentSnapshotIds.add(entry.snapshotId());
+                ImmutableList.Builder<SnapshotIndexShardStatus> shardStatusBuilder = ImmutableList.builder();
+                for (ImmutableMap.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shardEntry : entry.shards().entrySet()) {
+                    SnapshotMetaData.ShardSnapshotStatus status = shardEntry.getValue();
+                    if (status.nodeId() != null) {
+                        // We should have information about this shard from the shard:
+                        TransportNodesSnapshotsStatus.NodeSnapshotStatus nodeStatus = nodeSnapshotStatusMap.get(status.nodeId());
+                        if (nodeStatus != null) {
+                            ImmutableMap<ShardId, SnapshotIndexShardStatus> shardStatues = nodeStatus.status().get(entry.snapshotId());
+                            if (shardStatues != null) {
+                                SnapshotIndexShardStatus shardStatus = shardStatues.get(shardEntry.getKey());
+                                if (shardStatus != null) {
+                                    // We have full information about this shard
+                                    shardStatusBuilder.add(shardStatus);
+                                    continue;
+                                }
+                            }
+                        }
+                    }
+                    final SnapshotIndexShardStage stage;
+                    switch (shardEntry.getValue().state()) {
+                        case FAILED:
+                        case ABORTED:
+                        case MISSING:
+                            stage = SnapshotIndexShardStage.FAILURE;
+                            break;
+                        case INIT:
+                        case STARTED:
+                            stage = SnapshotIndexShardStage.STARTED;
+                            break;
+                        case SUCCESS:
+                            stage = SnapshotIndexShardStage.DONE;
+                            break;
+                        default:
+                            throw new ElasticsearchIllegalArgumentException("Unknown snapshot state " + shardEntry.getValue().state());
+                    }
+                    SnapshotIndexShardStatus shardStatus = new SnapshotIndexShardStatus(shardEntry.getKey().getIndex(), shardEntry.getKey().getId(), stage);
+                    shardStatusBuilder.add(shardStatus);
+                }
+                builder.add(new SnapshotStatus(entry.snapshotId(), entry.state(), shardStatusBuilder.build()));
+            }
+        }
+        // Now add snapshots on disk that are not currently running
+        if (Strings.hasText(request.repository())) {
+            if (request.snapshots() != null && request.snapshots().length > 0) {
+                for (String snapshotName : request.snapshots()) {
+                    SnapshotId snapshotId = new SnapshotId(request.repository(), snapshotName);
+                    if (currentSnapshotIds.contains(snapshotId)) {
+                        // This is a snapshot the is currently running - skipping
+                        continue;
+                    }
+                    Snapshot snapshot = snapshotsService.snapshot(snapshotId);
+                    ImmutableList.Builder<SnapshotIndexShardStatus> shardStatusBuilder = ImmutableList.builder();
+                    if (snapshot.state().completed()) {
+                        ImmutableMap<ShardId, IndexShardSnapshotStatus> shardStatues = snapshotsService.snapshotShards(snapshotId);
+                        for (ImmutableMap.Entry<ShardId, IndexShardSnapshotStatus> shardStatus : shardStatues.entrySet()) {
+                            shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), shardStatus.getValue()));
+                        }
+                        final SnapshotMetaData.State state;
+                        switch (snapshot.state()) {
+                            case FAILED:
+                                state = SnapshotMetaData.State.FAILED;
+                                break;
+                            case SUCCESS:
+                                state = SnapshotMetaData.State.SUCCESS;
+                                break;
+                            default:
+                                throw new ElasticsearchIllegalArgumentException("Unknown snapshot state " + snapshot.state());
+                        }
+                        builder.add(new SnapshotStatus(snapshotId, state, shardStatusBuilder.build()));
+                    }
+                }
+            }
+        }
+
+        return new SnapshotsStatusResponse(builder.build());
+    }
+
+}

+ 18 - 0
src/main/java/org/elasticsearch/client/ClusterAdminClient.java

@@ -69,6 +69,9 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequestBuilder;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@@ -429,4 +432,19 @@ public interface ClusterAdminClient {
      */
     PendingClusterTasksRequestBuilder preparePendingClusterTasks();
 
+    /**
+     * Get snapshot status.
+     */
+    ActionFuture<SnapshotsStatusResponse> snapshotsStatus(SnapshotsStatusRequest request);
+
+    /**
+     * Get snapshot status.
+     */
+    void snapshotsStatus(SnapshotsStatusRequest request, ActionListener<SnapshotsStatusResponse> listener);
+
+    /**
+     * Get snapshot status.
+     */
+    SnapshotsStatusRequestBuilder prepareSnapshotStatus(String repository);
+
 }

+ 13 - 1
src/main/java/org/elasticsearch/client/Requests.java

@@ -34,6 +34,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotReq
 import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
@@ -536,7 +537,7 @@ public class Requests {
     }
 
     /**
-     * Restores new snapshot
+     * Deletes a snapshot
      *
      * @param snapshot   snapshot name
      * @param repository repository name
@@ -545,4 +546,15 @@ public class Requests {
     public static DeleteSnapshotRequest deleteSnapshotRequest(String repository, String snapshot) {
         return new DeleteSnapshotRequest(repository, snapshot);
     }
+
+    /**
+     *  Get status of snapshots
+     *
+     * @param repository repository name
+     * @return snapshot status request
+     */
+    public static SnapshotsStatusRequest snapshotsStatusRequest(String repository) {
+        return new SnapshotsStatusRequest(repository);
+    }
+
 }

+ 20 - 0
src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java

@@ -85,6 +85,10 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotA
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusAction;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequestBuilder;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
@@ -399,4 +403,20 @@ public abstract class AbstractClusterAdminClient implements InternalClusterAdmin
     public RestoreSnapshotRequestBuilder prepareRestoreSnapshot(String repository, String snapshot) {
         return new RestoreSnapshotRequestBuilder(this, repository, snapshot);
     }
+
+
+    @Override
+    public ActionFuture<SnapshotsStatusResponse> snapshotsStatus(SnapshotsStatusRequest request) {
+        return execute(SnapshotsStatusAction.INSTANCE, request);
+    }
+
+    @Override
+    public void snapshotsStatus(SnapshotsStatusRequest request, ActionListener<SnapshotsStatusResponse> listener) {
+        execute(SnapshotsStatusAction.INSTANCE, request, listener);
+    }
+
+    @Override
+    public SnapshotsStatusRequestBuilder prepareSnapshotStatus(String repository) {
+        return new SnapshotsStatusRequestBuilder(this, repository);
+    }
 }

+ 2 - 0
src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java

@@ -107,6 +107,8 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA
                 nodesList.add((NodeLocalGatewayMetaState) resp);
             } else if (resp instanceof FailedNodeException) {
                 failures.add((FailedNodeException) resp);
+            } else {
+                logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp);
             }
         }
         return new NodesLocalGatewayMetaState(clusterName, nodesList.toArray(new NodeLocalGatewayMetaState[nodesList.size()]),

+ 2 - 0
src/main/java/org/elasticsearch/gateway/local/state/shards/TransportNodesListGatewayStartedShards.java

@@ -108,6 +108,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
                 nodesList.add((NodeLocalGatewayStartedShards) resp);
             } else if (resp instanceof FailedNodeException) {
                 failures.add((FailedNodeException) resp);
+            } else {
+                logger.warn("unknown response type [{}], expected NodeLocalGatewayStartedShards or FailedNodeException", resp);
             }
         }
         return new NodesLocalGatewayStartedShards(clusterName, nodesList.toArray(new NodeLocalGatewayStartedShards[nodesList.size()]),

+ 9 - 0
src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java

@@ -60,4 +60,13 @@ public interface IndexShardRepository {
      */
     void restore(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryStatus recoveryStatus);
 
+    /**
+     * Retrieve shard snapshot status for the stored snapshot
+     *
+     * @param snapshotId snapshot id
+     * @param shardId    shard id
+     * @return snapshot status
+     */
+    IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, ShardId shardId);
+
 }

+ 61 - 0
src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java

@@ -19,6 +19,9 @@
 
 package org.elasticsearch.index.snapshots;
 
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Represent shard snapshot status
  */
@@ -58,12 +61,18 @@ public class IndexShardSnapshotStatus {
 
     private int numberOfFiles;
 
+    private volatile int processedFiles;
+
     private long totalSize;
 
+    private volatile long processedSize;
+
     private long indexVersion;
 
     private boolean aborted;
 
+    private String failure;
+
     /**
      * Returns current snapshot stage
      *
@@ -145,6 +154,25 @@ public class IndexShardSnapshotStatus {
         this.totalSize = totalSize;
     }
 
+    /**
+     * Sets processed files stats
+     *
+     * @param numberOfFiles number of files in this snapshot
+     * @param totalSize     total size of files in this snapshot
+     */
+    public synchronized void processedFiles(int numberOfFiles, long totalSize) {
+        processedFiles = numberOfFiles;
+        processedSize = totalSize;
+    }
+
+    /**
+     * Increments number of processed files
+     */
+    public synchronized void addProcessedFile(long size) {
+        processedFiles++;
+        processedSize += size;
+    }
+
     /**
      * Number of files
      *
@@ -163,6 +191,25 @@ public class IndexShardSnapshotStatus {
         return totalSize;
     }
 
+    /**
+     * Number of processed files
+     *
+     * @return number of processed files
+     */
+    public int processedFiles() {
+        return processedFiles;
+    }
+
+    /**
+     * Size of processed files
+     *
+     * @return size of processed files
+     */
+    public long processedSize() {
+        return processedSize;
+    }
+
+
     /**
      * Sets index version
      *
@@ -180,4 +227,18 @@ public class IndexShardSnapshotStatus {
     public long indexVersion() {
         return indexVersion;
     }
+
+    /**
+     * Sets the reason for the failure if the snapshot is in the {@link IndexShardSnapshotStatus.Stage#FAILURE} state
+     */
+    public void failure(String failure) {
+        this.failure = failure;
+    }
+
+    /**
+     * Returns the reason for the failure if the snapshot is in the {@link IndexShardSnapshotStatus.Stage#FAILURE} state
+     */
+    public String failure() {
+        return failure;
+    }
 }

+ 60 - 19
src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java

@@ -26,6 +26,7 @@ import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.RateLimiter;
 import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.cluster.metadata.SnapshotId;
 import org.elasticsearch.common.blobstore.*;
 import org.elasticsearch.common.component.AbstractComponent;
@@ -50,6 +51,7 @@ import org.elasticsearch.repositories.RepositoryName;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -130,6 +132,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
         } catch (Throwable e) {
             snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime());
             snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE);
+            snapshotStatus.failure(ExceptionsHelper.detailedMessage(e));
             if (e instanceof IndexShardSnapshotFailedException) {
                 throw (IndexShardSnapshotFailedException) e;
             } else {
@@ -154,6 +157,23 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, ShardId shardId) {
+        Context context = new Context(snapshotId, shardId);
+        BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
+        IndexShardSnapshotStatus status = new IndexShardSnapshotStatus();
+        status.updateStage(IndexShardSnapshotStatus.Stage.DONE);
+        status.startTime(snapshot.startTime());
+        status.files(snapshot.numberOfFiles(), snapshot.totalSize());
+        // The snapshot is done which means the number of processed files is the same as total
+        status.processedFiles(snapshot.numberOfFiles(), snapshot.totalSize());
+        status.time(snapshot.time());
+        return status;
+    }
+
     /**
      * Delete shard snapshot
      *
@@ -266,6 +286,19 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
             cleanup(newSnapshotsList, blobs);
         }
 
+        /**
+         * Loads information about shard snapshot
+         */
+        public BlobStoreIndexShardSnapshot loadSnapshot() {
+            BlobStoreIndexShardSnapshot snapshot;
+            try {
+                snapshot = readSnapshot(blobContainer.readBlobFully(snapshotBlobName(snapshotId)));
+            } catch (IOException ex) {
+                throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex);
+            }
+            return snapshot;
+        }
+
         /**
          * Removes all unreferenced files from the repository
          *
@@ -343,6 +376,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
             }
             return new BlobStoreIndexShardSnapshots(snapshots);
         }
+
     }
 
     /**
@@ -370,7 +404,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
         /**
          * Create snapshot from index commit point
          *
-         * @param snapshotIndexCommit
+         * @param snapshotIndexCommit snapshot commit point
          */
         public void snapshot(SnapshotIndexCommit snapshotIndexCommit) {
             logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName);
@@ -385,14 +419,12 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
             long generation = findLatestFileNameGeneration(blobs);
             BlobStoreIndexShardSnapshots snapshots = buildBlobStoreIndexShardSnapshots(blobs);
 
-            snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED);
-
-            final CountDownLatch indexLatch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
             final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
             final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = newArrayList();
 
             int indexNumberOfFiles = 0;
             long indexTotalFilesSize = 0;
+            ArrayList<FileInfo> filesToSnapshot = newArrayList();
             for (String fileName : snapshotIndexCommit.getFiles()) {
                 if (snapshotStatus.aborted()) {
                     logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
@@ -423,20 +455,28 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
                     indexNumberOfFiles++;
                     indexTotalFilesSize += md.length();
                     // create a new FileInfo
-                    try {
-                        BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), fileName, md.length(), chunkSize, md.checksum());
-                        indexCommitPointFiles.add(snapshotFileInfo);
-                        snapshotFile(snapshotFileInfo, indexLatch, failures);
-                    } catch (IOException e) {
-                        failures.add(e);
-                    }
+                    BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), fileName, md.length(), chunkSize, md.checksum());
+                    indexCommitPointFiles.add(snapshotFileInfo);
+                    filesToSnapshot.add(snapshotFileInfo);
                 } else {
                     indexCommitPointFiles.add(fileInfo);
-                    indexLatch.countDown();
                 }
             }
 
             snapshotStatus.files(indexNumberOfFiles, indexTotalFilesSize);
+
+            snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED);
+
+            final CountDownLatch indexLatch = new CountDownLatch(filesToSnapshot.size());
+
+            for (FileInfo snapshotFileInfo : filesToSnapshot) {
+                try {
+                    snapshotFile(snapshotFileInfo, indexLatch, failures);
+                } catch (IOException e) {
+                    failures.add(e);
+                }
+            }
+
             snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration());
 
             try {
@@ -453,7 +493,11 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
             snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE);
 
             String commitPointName = snapshotBlobName(snapshotId);
-            BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(), snapshotIndexCommit.getGeneration(), indexCommitPointFiles);
+            BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(),
+                    snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(),
+                    // snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong
+                    System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize);
+            //TODO: The time stored in snapshot doesn't include cleanup time.
             try {
                 byte[] snapshotData = writeSnapshot(snapshot);
                 logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
@@ -506,6 +550,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
                         @Override
                         public void onCompleted() {
                             IOUtils.closeWhileHandlingException(fIndexInput);
+                            snapshotStatus.addProcessedFile(fileInfo.length());
                             if (counter.decrementAndGet() == 0) {
                                 latch.countDown();
                             }
@@ -514,6 +559,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
                         @Override
                         public void onFailure(Throwable t) {
                             IOUtils.closeWhileHandlingException(fIndexInput);
+                            snapshotStatus.addProcessedFile(0);
                             failures.add(t);
                             if (counter.decrementAndGet() == 0) {
                                 latch.countDown();
@@ -613,12 +659,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
          */
         public void restore() {
             logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
-            BlobStoreIndexShardSnapshot snapshot;
-            try {
-                snapshot = readSnapshot(blobContainer.readBlobFully(snapshotBlobName(snapshotId)));
-            } catch (IOException ex) {
-                throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex);
-            }
+            BlobStoreIndexShardSnapshot snapshot = loadSnapshot();
 
             recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
             int numberOfFiles = 0;

+ 101 - 12
src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java

@@ -22,6 +22,7 @@ package org.elasticsearch.index.snapshots.blobstore;
 import com.google.common.collect.ImmutableList;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -269,21 +270,38 @@ public class BlobStoreIndexShardSnapshot {
 
     private final long indexVersion;
 
+    private final long startTime;
+
+    private final long time;
+
+    private final int numberOfFiles;
+
+    private final long totalSize;
+
     private final ImmutableList<FileInfo> indexFiles;
 
     /**
      * Constructs new shard snapshot metadata from snapshot metadata
      *
-     * @param snapshot     snapshot id
-     * @param indexVersion index version
-     * @param indexFiles   list of files in the shard
+     * @param snapshot      snapshot id
+     * @param indexVersion  index version
+     * @param indexFiles    list of files in the shard
+     * @param startTime     snapshot start time
+     * @param time          snapshot running time
+     * @param numberOfFiles number of files that where snapshotted
+     * @param totalSize     total size of all files snapshotted
      */
-    public BlobStoreIndexShardSnapshot(String snapshot, long indexVersion, List<FileInfo> indexFiles) {
+    public BlobStoreIndexShardSnapshot(String snapshot, long indexVersion, List<FileInfo> indexFiles, long startTime, long time,
+                                       int numberOfFiles, long totalSize) {
         assert snapshot != null;
         assert indexVersion >= 0;
         this.snapshot = snapshot;
         this.indexVersion = indexVersion;
         this.indexFiles = ImmutableList.copyOf(indexFiles);
+        this.startTime = startTime;
+        this.time = time;
+        this.numberOfFiles = numberOfFiles;
+        this.totalSize = totalSize;
     }
 
     /**
@@ -313,6 +331,55 @@ public class BlobStoreIndexShardSnapshot {
         return indexFiles;
     }
 
+    /**
+     * Returns snapshot start time
+     */
+    public long startTime() {
+        return startTime;
+    }
+
+    /**
+     * Returns snapshot running time
+     */
+    public long time() {
+        return time;
+    }
+
+    /**
+     * Returns number of files that where snapshotted
+     */
+    public int numberOfFiles() {
+        return numberOfFiles;
+    }
+
+    /**
+     * Returns total size of all files that where snapshotted
+     */
+    public long totalSize() {
+        return totalSize;
+    }
+
+    static final class Fields {
+        static final XContentBuilderString NAME = new XContentBuilderString("name");
+        static final XContentBuilderString INDEX_VERSION = new XContentBuilderString("index_version");
+        static final XContentBuilderString START_TIME = new XContentBuilderString("start_time");
+        static final XContentBuilderString TIME = new XContentBuilderString("time");
+        static final XContentBuilderString NUMBER_OF_FILES = new XContentBuilderString("number_of_files");
+        static final XContentBuilderString TOTAL_SIZE = new XContentBuilderString("total_size");
+        static final XContentBuilderString FILES = new XContentBuilderString("files");
+    }
+
+    static final class ParseFields {
+        static final ParseField NAME = new ParseField("name");
+        static final ParseField INDEX_VERSION = new ParseField("index_version", "index-version");
+        static final ParseField START_TIME = new ParseField("start_time");
+        static final ParseField TIME = new ParseField("time");
+        static final ParseField NUMBER_OF_FILES = new ParseField("number_of_files");
+        static final ParseField TOTAL_SIZE = new ParseField("total_size");
+        static final ParseField FILES = new ParseField("files");
+    }
+
+
     /**
      * Serializes shard snapshot metadata info into JSON
      *
@@ -323,9 +390,13 @@ public class BlobStoreIndexShardSnapshot {
      */
     public static void toXContent(BlobStoreIndexShardSnapshot snapshot, XContentBuilder builder, ToXContent.Params params) throws IOException {
         builder.startObject();
-        builder.field("name", snapshot.snapshot);
-        builder.field("index-version", snapshot.indexVersion);
-        builder.startArray("files");
+        builder.field(Fields.NAME, snapshot.snapshot);
+        builder.field(Fields.INDEX_VERSION, snapshot.indexVersion);
+        builder.field(Fields.START_TIME, snapshot.startTime);
+        builder.field(Fields.TIME, snapshot.time);
+        builder.field(Fields.NUMBER_OF_FILES, snapshot.numberOfFiles);
+        builder.field(Fields.TOTAL_SIZE, snapshot.totalSize);
+        builder.startArray(Fields.FILES);
         for (FileInfo fileInfo : snapshot.indexFiles) {
             FileInfo.toXContent(fileInfo, builder, params);
         }
@@ -344,6 +415,10 @@ public class BlobStoreIndexShardSnapshot {
 
         String snapshot = null;
         long indexVersion = -1;
+        long startTime = 0;
+        long time = 0;
+        int numberOfFiles = 0;
+        long totalSize = 0;
 
         List<FileInfo> indexFiles = newArrayList();
 
@@ -354,16 +429,29 @@ public class BlobStoreIndexShardSnapshot {
                     String currentFieldName = parser.currentName();
                     token = parser.nextToken();
                     if (token.isValue()) {
-                        if ("name".equals(currentFieldName)) {
+                        if (ParseFields.NAME.match(currentFieldName)) {
                             snapshot = parser.text();
-                        } else if ("index-version".equals(currentFieldName)) {
+                        } else if (ParseFields.INDEX_VERSION.match(currentFieldName)) {
+                            // The index-version is needed for backward compatibility with v 1.0
                             indexVersion = parser.longValue();
+                        } else if (ParseFields.START_TIME.match(currentFieldName)) {
+                            startTime = parser.longValue();
+                        } else if (ParseFields.TIME.match(currentFieldName)) {
+                            time = parser.longValue();
+                        } else if (ParseFields.NUMBER_OF_FILES.match(currentFieldName)) {
+                            numberOfFiles = parser.intValue();
+                        } else if (ParseFields.TOTAL_SIZE.match(currentFieldName)) {
+                            totalSize = parser.longValue();
                         } else {
                             throw new ElasticsearchParseException("unknown parameter [" + currentFieldName + "]");
                         }
                     } else if (token == XContentParser.Token.START_ARRAY) {
-                        while ((parser.nextToken()) != XContentParser.Token.END_ARRAY) {
-                            indexFiles.add(FileInfo.fromXContent(parser));
+                        if (ParseFields.FILES.match(currentFieldName)) {
+                            while ((parser.nextToken()) != XContentParser.Token.END_ARRAY) {
+                                indexFiles.add(FileInfo.fromXContent(parser));
+                            }
+                        } else {
+                            throw new ElasticsearchParseException("unknown parameter [" + currentFieldName + "]");
                         }
                     } else {
                         throw new ElasticsearchParseException("unexpected token  [" + token + "]");
@@ -373,7 +461,8 @@ public class BlobStoreIndexShardSnapshot {
                 }
             }
         }
-        return new BlobStoreIndexShardSnapshot(snapshot, indexVersion, ImmutableList.<FileInfo>copyOf(indexFiles));
+        return new BlobStoreIndexShardSnapshot(snapshot, indexVersion, ImmutableList.<FileInfo>copyOf(indexFiles),
+                startTime, time, numberOfFiles, totalSize);
     }
 
     /**

+ 2 - 0
src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java

@@ -116,6 +116,8 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
                 nodeStoreFilesMetaDatas.add((NodeStoreFilesMetaData) resp);
             } else if (resp instanceof FailedNodeException) {
                 failures.add((FailedNodeException) resp);
+            } else {
+                logger.warn("unknown response type [{}], expected NodeStoreFilesMetaData or FailedNodeException", resp);
             }
         }
         return new NodesStoreFilesMetaData(clusterName, nodeStoreFilesMetaDatas.toArray(new NodeStoreFilesMetaData[nodeStoreFilesMetaDatas.size()]),

+ 2 - 0
src/main/java/org/elasticsearch/repositories/RepositoriesModule.java

@@ -21,6 +21,7 @@ package org.elasticsearch.repositories;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
 import org.elasticsearch.common.inject.AbstractModule;
 import org.elasticsearch.common.inject.Module;
 import org.elasticsearch.repositories.fs.FsRepository;
@@ -61,6 +62,7 @@ public class RepositoriesModule extends AbstractModule {
     protected void configure() {
         bind(RepositoriesService.class).asEagerSingleton();
         bind(SnapshotsService.class).asEagerSingleton();
+        bind(TransportNodesSnapshotsStatus.class).asEagerSingleton();
         bind(RestoreService.class).asEagerSingleton();
         bind(RepositoryTypesRegistry.class).toInstance(new RepositoryTypesRegistry(ImmutableMap.copyOf(repositoryTypes)));
     }

+ 2 - 0
src/main/java/org/elasticsearch/rest/action/RestActionModule.java

@@ -40,6 +40,7 @@ import org.elasticsearch.rest.action.admin.cluster.snapshots.create.RestCreateSn
 import org.elasticsearch.rest.action.admin.cluster.snapshots.delete.RestDeleteSnapshotAction;
 import org.elasticsearch.rest.action.admin.cluster.snapshots.get.RestGetSnapshotsAction;
 import org.elasticsearch.rest.action.admin.cluster.snapshots.restore.RestRestoreSnapshotAction;
+import org.elasticsearch.rest.action.admin.cluster.snapshots.status.RestSnapshotsStatusAction;
 import org.elasticsearch.rest.action.admin.cluster.state.RestClusterStateAction;
 import org.elasticsearch.rest.action.admin.cluster.stats.RestClusterStatsAction;
 import org.elasticsearch.rest.action.admin.cluster.tasks.RestPendingClusterTasksAction;
@@ -141,6 +142,7 @@ public class RestActionModule extends AbstractModule {
         bind(RestCreateSnapshotAction.class).asEagerSingleton();
         bind(RestRestoreSnapshotAction.class).asEagerSingleton();
         bind(RestDeleteSnapshotAction.class).asEagerSingleton();
+        bind(RestSnapshotsStatusAction.class).asEagerSingleton();
 
         bind(RestIndicesExistsAction.class).asEagerSingleton();
         bind(RestTypesExistsAction.class).asEagerSingleton();

+ 77 - 0
src/main/java/org/elasticsearch/rest/action/admin/cluster/snapshots/status/RestSnapshotsStatusAction.java

@@ -0,0 +1,77 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.rest.action.admin.cluster.snapshots.status;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.rest.*;
+import org.elasticsearch.rest.action.support.RestXContentBuilder;
+
+import java.io.IOException;
+
+import static org.elasticsearch.client.Requests.snapshotsStatusRequest;
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+import static org.elasticsearch.rest.RestStatus.OK;
+
+/**
+ * Returns status of currently running snapshot
+ */
+public class RestSnapshotsStatusAction extends BaseRestHandler {
+
+    @Inject
+    public RestSnapshotsStatusAction(Settings settings, Client client, RestController controller) {
+        super(settings, client);
+        controller.registerHandler(GET, "/_snapshot/{repository}/{snapshot}/_status", this);
+        controller.registerHandler(GET, "/_snapshot/{repository}/_status", this);
+        controller.registerHandler(GET, "/_snapshot/_status", this);
+    }
+
+    @Override
+    public void handleRequest(final RestRequest request, final RestChannel channel) {
+        String repository = request.param("repository");
+        String[] snapshots = request.paramAsStringArray("snapshot", Strings.EMPTY_ARRAY);
+        if (snapshots.length == 1 && "_all".equalsIgnoreCase(snapshots[0])) {
+            snapshots = Strings.EMPTY_ARRAY;
+        }
+        SnapshotsStatusRequest snapshotsStatusResponse = snapshotsStatusRequest(repository).snapshots(snapshots);
+
+        snapshotsStatusResponse.masterNodeTimeout(request.paramAsTime("master_timeout", snapshotsStatusResponse.masterNodeTimeout()));
+        client.admin().cluster().snapshotsStatus(snapshotsStatusResponse, new AbstractRestResponseActionListener<SnapshotsStatusResponse>(request, channel, logger) {
+            @Override
+            public void onResponse(SnapshotsStatusResponse response) {
+                try {
+                    XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
+                    builder.startObject();
+                    response.toXContent(builder, request);
+                    builder.endObject();
+                    channel.sendResponse(new XContentRestResponse(request, OK, builder));
+                } catch (IOException e) {
+                    onFailure(e);
+                }
+            }
+        });
+    }
+}

+ 123 - 0
src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -42,6 +42,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.snapshots.IndexShardRepository;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.indices.IndicesService;
@@ -365,6 +366,128 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
         }
     }
 
+    /**
+     * Returns status of the currently running snapshots
+     * <p>
+     * This method is executed on master node
+     * </p>
+     *
+     * @param repository repository id
+     * @param snapshots  optional list of snapshots that will be used as a filter
+     * @return list of metadata for currently running snapshots
+     */
+    public ImmutableList<SnapshotMetaData.Entry> currentSnapshots(String repository, String[] snapshots) {
+        MetaData metaData = clusterService.state().metaData();
+        SnapshotMetaData snapshotMetaData = metaData.custom(SnapshotMetaData.TYPE);
+        if (snapshotMetaData == null || snapshotMetaData.entries().isEmpty()) {
+            return ImmutableList.of();
+        }
+        if (repository == null) {
+            return snapshotMetaData.entries();
+        }
+        if (snapshotMetaData.entries().size() == 1) {
+            // Most likely scenario - one snapshot is currently running
+            // Check this snapshot against the query
+            SnapshotMetaData.Entry entry = snapshotMetaData.entries().get(0);
+            if (!entry.snapshotId().getRepository().equals(repository)) {
+                return ImmutableList.of();
+            }
+            if (snapshots != null && snapshots.length > 0) {
+                for (String snapshot : snapshots) {
+                    if (entry.snapshotId().getSnapshot().equals(snapshot)) {
+                        return snapshotMetaData.entries();
+                    }
+                }
+                return ImmutableList.of();
+            } else {
+                return snapshotMetaData.entries();
+            }
+        }
+        ImmutableList.Builder<SnapshotMetaData.Entry> builder = ImmutableList.builder();
+        for (SnapshotMetaData.Entry entry : snapshotMetaData.entries()) {
+            if (!entry.snapshotId().getRepository().equals(repository)) {
+                continue;
+            }
+            if (snapshots != null && snapshots.length > 0) {
+                for (String snapshot : snapshots) {
+                    if (entry.snapshotId().getSnapshot().equals(snapshot)) {
+                        builder.add(entry);
+                        break;
+                    }
+                }
+            } else {
+                builder.add(entry);
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Returns status of shards that are snapshotted on the node and belong to the given snapshot
+     * <p>
+     * This method is executed on data node
+     * </p>
+     *
+     * @param snapshotId snapshot id
+     * @return map of shard id to snapshot status
+     */
+    public ImmutableMap<ShardId, IndexShardSnapshotStatus> currentSnapshotShards(SnapshotId snapshotId) {
+        SnapshotShards snapshotShards = shardSnapshots.get(snapshotId);
+        if (snapshotShards == null) {
+            return null;
+        } else {
+            return snapshotShards.shards;
+        }
+    }
+
+    /**
+     * Returns status of shards  currently finished snapshots
+     * <p>
+     * This method is executed on master node and it's complimentary to the {@link #currentSnapshotShards(SnapshotId)} becuase it
+     * returns simliar information but for already finished snapshots.
+     * </p>
+     *
+     * @param snapshotId snapshot id
+     * @return map of shard id to snapshot status
+     */
+    public ImmutableMap<ShardId, IndexShardSnapshotStatus> snapshotShards(SnapshotId snapshotId) {
+        ImmutableMap.Builder<ShardId, IndexShardSnapshotStatus> shardStatusBuilder = ImmutableMap.builder();
+        Repository repository = repositoriesService.repository(snapshotId.getRepository());
+        IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(snapshotId.getRepository());
+        Snapshot snapshot = repository.readSnapshot(snapshotId);
+        MetaData metaData = repository.readSnapshotMetaData(snapshotId, snapshot.indices());
+        for (String index : snapshot.indices()) {
+            IndexMetaData indexMetaData = metaData.indices().get(index);
+            if (indexMetaData != null) {
+                int numberOfShards = indexMetaData.getNumberOfShards();
+                for (int i = 0; i < numberOfShards; i++) {
+                    ShardId shardId = new ShardId(index, i);
+                    SnapshotShardFailure shardFailure = findShardFailure(snapshot.shardFailures(), shardId);
+                    if (shardFailure != null) {
+                        IndexShardSnapshotStatus shardSnapshotStatus = new IndexShardSnapshotStatus();
+                        shardSnapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE);
+                        shardSnapshotStatus.failure(shardFailure.reason());
+                        shardStatusBuilder.put(shardId, shardSnapshotStatus);
+                    } else {
+                        IndexShardSnapshotStatus shardSnapshotStatus = indexShardRepository.snapshotStatus(snapshotId, shardId);
+                        shardStatusBuilder.put(shardId, shardSnapshotStatus);
+                    }
+                }
+            }
+        }
+        return shardStatusBuilder.build();
+    }
+
+
+    private SnapshotShardFailure findShardFailure(ImmutableList<SnapshotShardFailure> shardFailures, ShardId shardId) {
+        for (SnapshotShardFailure shardFailure : shardFailures) {
+            if (shardId.getIndex().equals(shardFailure.index()) && shardId.getId() == shardFailure.shardId()) {
+                return shardFailure;
+            }
+        }
+        return null;
+    }
+
     @Override
     public void clusterChanged(ClusterChangedEvent event) {
         try {

+ 108 - 1
src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java

@@ -28,12 +28,17 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
 import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.status.*;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
 import org.elasticsearch.action.count.CountResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.SnapshotMetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -375,7 +380,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
                         ImmutableSettings.settingsBuilder()
                                 .put("location", newTempDir(LifecycleScope.TEST))
                                 .put("random", randomAsciiOfLength(10))
-                                .put("random_data_file_io_exception_rate", 0.1)));
+                                .put("random_data_file_io_exception_rate", 0.3)));
 
         createIndex("test-idx");
         ensureGreen();
@@ -390,9 +395,11 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
         logger.info("--> snapshot");
         CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();
         if (createSnapshotResponse.getSnapshotInfo().totalShards() == createSnapshotResponse.getSnapshotInfo().successfulShards()) {
+            logger.info("--> no failures");
             // If we are here, that means we didn't have any failures, let's check it
             assertThat(getFailureCount("test-repo"), equalTo(0L));
         } else {
+            logger.info("--> some failures");
             assertThat(getFailureCount("test-repo"), greaterThan(0L));
             assertThat(createSnapshotResponse.getSnapshotInfo().shardFailures().size(), greaterThan(0));
             for (SnapshotShardFailure shardFailure : createSnapshotResponse.getSnapshotInfo().shardFailures()) {
@@ -404,6 +411,28 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
             SnapshotInfo snapshotInfo = getSnapshotsResponse.getSnapshots().get(0);
             assertThat(snapshotInfo.shardFailures().size(), greaterThan(0));
             assertThat(snapshotInfo.totalShards(), greaterThan(snapshotInfo.successfulShards()));
+
+            // Verify that snapshot status also contains the same failures
+            SnapshotsStatusResponse snapshotsStatusResponse = client.admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap").get();
+            assertThat(snapshotsStatusResponse.getSnapshots().size(), equalTo(1));
+            SnapshotStatus snapshotStatus = snapshotsStatusResponse.getSnapshots().get(0);
+            assertThat(snapshotStatus.getIndices().size(), equalTo(1));
+            SnapshotIndexStatus indexStatus = snapshotStatus.getIndices().get("test-idx");
+            assertThat(indexStatus, notNullValue());
+            assertThat(indexStatus.getShardsStats().getFailedShards(), equalTo(snapshotInfo.failedShards()));
+            assertThat(indexStatus.getShardsStats().getDoneShards(), equalTo(snapshotInfo.successfulShards()));
+            assertThat(indexStatus.getShards().size(), equalTo(snapshotInfo.totalShards()));
+
+            int numberOfFailures = 0;
+            for (SnapshotIndexShardStatus shardStatus : indexStatus.getShards().values()) {
+                if (shardStatus.getStage() == SnapshotIndexShardStage.FAILURE) {
+                    assertThat(shardStatus.getFailure(), notNullValue());
+                    numberOfFailures++;
+                } else {
+                    assertThat(shardStatus.getFailure(), nullValue());
+                }
+            }
+            assertThat(indexStatus.getShardsStats().getFailedShards(), equalTo(numberOfFailures));
         }
 
     }
@@ -972,6 +1001,84 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
         }
     }
 
+
+    @Test
+    @TestLogging("cluster.routing.allocation.decider:TRACE")
+    public void snapshotStatusTest() throws Exception {
+        Client client = client();
+        File repositoryLocation = newTempDir(LifecycleScope.TEST);
+        logger.info("-->  creating repository");
+        PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
+                .setType(MockRepositoryModule.class.getCanonicalName()).setSettings(
+                        ImmutableSettings.settingsBuilder()
+                                .put("location", repositoryLocation)
+                                .put("random", randomAsciiOfLength(10))
+                                .put("wait_after_unblock", 200)
+                ).get();
+        assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
+
+        // Create index on 2 nodes and make sure each node has a primary by setting no replicas
+        assertAcked(prepareCreate("test-idx", 2, ImmutableSettings.builder().put("number_of_replicas", 0)));
+
+        logger.info("--> indexing some data");
+        for (int i = 0; i < 100; i++) {
+            index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
+        }
+        refresh();
+        assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
+
+        // Pick one node and block it
+        String blockedNode = blockNodeWithIndex("test-idx");
+
+
+        logger.info("--> snapshot");
+        client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
+
+        logger.info("--> waiting for block to kick in");
+        waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
+
+        logger.info("--> execution was blocked on node [{}], checking snapshot status", blockedNode);
+        SnapshotsStatusResponse response = client.admin().cluster().prepareSnapshotStatus("test-repo").execute().actionGet();
+
+        logger.info("--> unblocking blocked node");
+        unblockNode(blockedNode);
+
+        assertThat(response.getSnapshots().size(), equalTo(1));
+        SnapshotStatus snapshotStatus = response.getSnapshots().get(0);
+        assertThat(snapshotStatus.getState(), equalTo(SnapshotMetaData.State.STARTED));
+        // We blocked the node during data write operation, so at least one shard snapshot should be in STARTED stage
+        assertThat(snapshotStatus.getShardsStats().getStartedShards(), greaterThan(0));
+        for( SnapshotIndexShardStatus shardStatus : snapshotStatus.getIndices().get("test-idx")) {
+            if (shardStatus.getStage() == SnapshotIndexShardStage.STARTED) {
+                assertThat(shardStatus.getNodeId(), notNullValue());
+            }
+        }
+
+        SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
+        logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
+        logger.info("--> done");
+
+
+        logger.info("--> checking snapshot status again after snapshot is done", blockedNode);
+        response = client.admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap").execute().actionGet();
+        snapshotStatus = response.getSnapshots().get(0);
+        assertThat(snapshotStatus.getIndices().size(), equalTo(1));
+        SnapshotIndexStatus indexStatus = snapshotStatus.getIndices().get("test-idx");
+        assertThat(indexStatus, notNullValue());
+        assertThat(indexStatus.getShardsStats().getInitializingShards(), equalTo(0));
+        assertThat(indexStatus.getShardsStats().getFailedShards(), equalTo(snapshotInfo.failedShards()));
+        assertThat(indexStatus.getShardsStats().getDoneShards(), equalTo(snapshotInfo.successfulShards()));
+        assertThat(indexStatus.getShards().size(), equalTo(snapshotInfo.totalShards()));
+
+        try {
+            client.admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap-doesnt-exist").execute().actionGet();
+            fail();
+        } catch (SnapshotMissingException ex) {
+            // Expected
+        }
+
+    }
+
     private boolean waitForIndex(String index, TimeValue timeout) throws InterruptedException {
         long start = System.currentTimeMillis();
         while (System.currentTimeMillis() - start < timeout.millis()) {