|  | @@ -92,16 +92,20 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
 | 
	
		
			
				|  |  |   * <p>
 | 
	
		
			
				|  |  |   * A typical snapshot creating process looks like this:
 | 
	
		
			
				|  |  |   * <ul>
 | 
	
		
			
				|  |  | - * <li>On the master node the {@link #createSnapshot(SnapshotRequest, CreateSnapshotListener)} is called and makes sure that no snapshots is currently running
 | 
	
		
			
				|  |  | - * and registers the new snapshot in cluster state</li>
 | 
	
		
			
				|  |  | - * <li>When cluster state is updated the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, CreateSnapshotListener)} method
 | 
	
		
			
				|  |  | - * kicks in and initializes the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
 | 
	
		
			
				|  |  | + * <li>On the master node the {@link #createSnapshot(SnapshotRequest, CreateSnapshotListener)} is called and makes sure that no snapshots
 | 
	
		
			
				|  |  | + * is currently running and registers the new snapshot in cluster state</li>
 | 
	
		
			
				|  |  | + * <li>When cluster state is updated
 | 
	
		
			
				|  |  | + * the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, CreateSnapshotListener)} method kicks in and initializes
 | 
	
		
			
				|  |  | + * the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
 | 
	
		
			
				|  |  |   * <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
 | 
	
		
			
				|  |  |   * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method</li>
 | 
	
		
			
				|  |  | - * <li>Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method</li>
 | 
	
		
			
				|  |  | - * <li>When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot as completed</li>
 | 
	
		
			
				|  |  | + * <li>Once shard snapshot is created data node updates state of the shard in the cluster state using
 | 
	
		
			
				|  |  | + * the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method</li>
 | 
	
		
			
				|  |  | + * <li>When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot
 | 
	
		
			
				|  |  | + * as completed</li>
 | 
	
		
			
				|  |  |   * <li>After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository,
 | 
	
		
			
				|  |  | - * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state</li>
 | 
	
		
			
				|  |  | + * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls
 | 
	
		
			
				|  |  | + * {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state</li>
 | 
	
		
			
				|  |  |   * </ul>
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier {
 | 
	
	
		
			
				|  | @@ -118,7 +122,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |      private final CopyOnWriteArrayList<SnapshotCompletionListener> snapshotCompletionListeners = new CopyOnWriteArrayList<>();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @Inject
 | 
	
		
			
				|  |  | -    public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, RepositoriesService repositoriesService, ThreadPool threadPool) {
 | 
	
		
			
				|  |  | +    public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
 | 
	
		
			
				|  |  | +                            RepositoriesService repositoriesService, ThreadPool threadPool) {
 | 
	
		
			
				|  |  |          super(settings);
 | 
	
		
			
				|  |  |          this.clusterService = clusterService;
 | 
	
		
			
				|  |  |          this.indexNameExpressionResolver = indexNameExpressionResolver;
 | 
	
	
		
			
				|  | @@ -253,7 +258,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |                  SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
 | 
	
		
			
				|  |  |                  if (snapshots == null || snapshots.entries().isEmpty()) {
 | 
	
		
			
				|  |  |                      // Store newSnapshot here to be processed in clusterStateProcessed
 | 
	
		
			
				|  |  | -                    List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request.indicesOptions(), request.indices()));
 | 
	
		
			
				|  |  | +                    List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
 | 
	
		
			
				|  |  | +                                                        request.indicesOptions(), request.indices()));
 | 
	
		
			
				|  |  |                      logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
 | 
	
		
			
				|  |  |                      List<IndexId> snapshotIndices = repositoryData.resolveNewIndices(indices);
 | 
	
		
			
				|  |  |                      newSnapshot = new SnapshotsInProgress.Entry(new Snapshot(repositoryName, snapshotId),
 | 
	
	
		
			
				|  | @@ -393,9 +399,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |                              if (entry.state() != State.ABORTED) {
 | 
	
		
			
				|  |  |                                  // Replace the snapshot that was just intialized
 | 
	
		
			
				|  |  | -                                ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
 | 
	
		
			
				|  |  | +                                ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards =
 | 
	
		
			
				|  |  | +                                        shards(currentState, entry.indices());
 | 
	
		
			
				|  |  |                                  if (!partial) {
 | 
	
		
			
				|  |  | -                                    Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
 | 
	
		
			
				|  |  | +                                    Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
 | 
	
		
			
				|  |  | +                                        currentState.metaData());
 | 
	
		
			
				|  |  |                                      Set<String> missing = indicesWithMissingShards.v1();
 | 
	
		
			
				|  |  |                                      Set<String> closed = indicesWithMissingShards.v2();
 | 
	
		
			
				|  |  |                                      if (missing.isEmpty() == false || closed.isEmpty() == false) {
 | 
	
	
		
			
				|  | @@ -437,8 +445,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |                      @Override
 | 
	
		
			
				|  |  |                      public void onFailure(String source, Exception e) {
 | 
	
		
			
				|  |  | -                        logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot", snapshot.snapshot().getSnapshotId()), e);
 | 
	
		
			
				|  |  | -                        removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e));
 | 
	
		
			
				|  |  | +                        logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot",
 | 
	
		
			
				|  |  | +                            snapshot.snapshot().getSnapshotId()), e);
 | 
	
		
			
				|  |  | +                        removeSnapshotFromClusterState(snapshot.snapshot(), null, e,
 | 
	
		
			
				|  |  | +                            new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e));
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |                      @Override
 | 
	
	
		
			
				|  | @@ -471,8 +481,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              @Override
 | 
	
		
			
				|  |  |              public void onFailure(Exception e) {
 | 
	
		
			
				|  |  | -                logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]", snapshot.snapshot().getSnapshotId()), e);
 | 
	
		
			
				|  |  | -                removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e));
 | 
	
		
			
				|  |  | +                logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]",
 | 
	
		
			
				|  |  | +                    snapshot.snapshot().getSnapshotId()), e);
 | 
	
		
			
				|  |  | +                removeSnapshotFromClusterState(snapshot.snapshot(), null, e,
 | 
	
		
			
				|  |  | +                    new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e));
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          });
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -484,7 +496,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |          private final CreateSnapshotListener userCreateSnapshotListener;
 | 
	
		
			
				|  |  |          private final Exception e;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        CleanupAfterErrorListener(SnapshotsInProgress.Entry snapshot, boolean snapshotCreated, CreateSnapshotListener userCreateSnapshotListener, Exception e) {
 | 
	
		
			
				|  |  | +        CleanupAfterErrorListener(SnapshotsInProgress.Entry snapshot, boolean snapshotCreated,
 | 
	
		
			
				|  |  | +                                  CreateSnapshotListener userCreateSnapshotListener, Exception e) {
 | 
	
		
			
				|  |  |              this.snapshot = snapshot;
 | 
	
		
			
				|  |  |              this.snapshotCreated = snapshotCreated;
 | 
	
		
			
				|  |  |              this.userCreateSnapshotListener = userCreateSnapshotListener;
 | 
	
	
		
			
				|  | @@ -520,7 +533,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |                                                           snapshot.includeGlobalState());
 | 
	
		
			
				|  |  |                  } catch (Exception inner) {
 | 
	
		
			
				|  |  |                      inner.addSuppressed(exception);
 | 
	
		
			
				|  |  | -                    logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository", snapshot.snapshot()), inner);
 | 
	
		
			
				|  |  | +                    logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository",
 | 
	
		
			
				|  |  | +                        snapshot.snapshot()), inner);
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |              userCreateSnapshotListener.onFailure(e);
 | 
	
	
		
			
				|  | @@ -744,8 +758,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |                                      } else {
 | 
	
		
			
				|  |  |                                          // TODO: Restart snapshot on another node?
 | 
	
		
			
				|  |  |                                          snapshotChanged = true;
 | 
	
		
			
				|  |  | -                                        logger.warn("failing snapshot of shard [{}] on closed node [{}]", shardEntry.key, shardStatus.nodeId());
 | 
	
		
			
				|  |  | -                                        shards.put(shardEntry.key, new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown"));
 | 
	
		
			
				|  |  | +                                        logger.warn("failing snapshot of shard [{}] on closed node [{}]",
 | 
	
		
			
				|  |  | +                                            shardEntry.key, shardStatus.nodeId());
 | 
	
		
			
				|  |  | +                                        shards.put(shardEntry.key,
 | 
	
		
			
				|  |  | +                                            new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown"));
 | 
	
		
			
				|  |  |                                      }
 | 
	
		
			
				|  |  |                                  }
 | 
	
		
			
				|  |  |                              }
 | 
	
	
		
			
				|  | @@ -808,7 +824,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |                          for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
 | 
	
		
			
				|  |  |                              SnapshotsInProgress.Entry updatedSnapshot = snapshot;
 | 
	
		
			
				|  |  |                              if (snapshot.state() == State.STARTED) {
 | 
	
		
			
				|  |  | -                                ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = processWaitingShards(snapshot.shards(), routingTable);
 | 
	
		
			
				|  |  | +                                ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = processWaitingShards(snapshot.shards(),
 | 
	
		
			
				|  |  | +                                    routingTable);
 | 
	
		
			
				|  |  |                                  if (shards != null) {
 | 
	
		
			
				|  |  |                                      changed = true;
 | 
	
		
			
				|  |  |                                      if (!snapshot.state().completed() && completed(shards.values())) {
 | 
	
	
		
			
				|  | @@ -831,7 +848,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |                  @Override
 | 
	
		
			
				|  |  |                  public void onFailure(String source, Exception e) {
 | 
	
		
			
				|  |  | -                    logger.warn(() -> new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e);
 | 
	
		
			
				|  |  | +                    logger.warn(() ->
 | 
	
		
			
				|  |  | +                        new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e);
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |              });
 | 
	
		
			
				|  |  |          }
 | 
	
	
		
			
				|  | @@ -929,12 +947,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |       * @param shards list of shard statuses
 | 
	
		
			
				|  |  |       * @return list of failed and closed indices
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  | -    private Tuple<Set<String>, Set<String>> indicesWithMissingShards(ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards, MetaData metaData) {
 | 
	
		
			
				|  |  | +    private Tuple<Set<String>, Set<String>> indicesWithMissingShards(
 | 
	
		
			
				|  |  | +        ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards, MetaData metaData) {
 | 
	
		
			
				|  |  |          Set<String> missing = new HashSet<>();
 | 
	
		
			
				|  |  |          Set<String> closed = new HashSet<>();
 | 
	
		
			
				|  |  |          for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> entry : shards) {
 | 
	
		
			
				|  |  |              if (entry.value.state() == State.MISSING) {
 | 
	
		
			
				|  |  | -                if (metaData.hasIndex(entry.key.getIndex().getName()) && metaData.getIndexSafe(entry.key.getIndex()).getState() == IndexMetaData.State.CLOSE) {
 | 
	
		
			
				|  |  | +                if (metaData.hasIndex(entry.key.getIndex().getName()) &&
 | 
	
		
			
				|  |  | +                    metaData.getIndexSafe(entry.key.getIndex()).getState() == IndexMetaData.State.CLOSE) {
 | 
	
		
			
				|  |  |                      closed.add(entry.key.getIndex().getName());
 | 
	
		
			
				|  |  |                  } else {
 | 
	
		
			
				|  |  |                      missing.add(entry.key.getIndex().getName());
 | 
	
	
		
			
				|  | @@ -1130,7 +1150,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |              public ClusterState execute(ClusterState currentState) throws Exception {
 | 
	
		
			
				|  |  |                  SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
 | 
	
		
			
				|  |  |                  if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
 | 
	
		
			
				|  |  | -                    throw new ConcurrentSnapshotExecutionException(snapshot, "cannot delete - another snapshot is currently being deleted");
 | 
	
		
			
				|  |  | +                    throw new ConcurrentSnapshotExecutionException(snapshot,
 | 
	
		
			
				|  |  | +                        "cannot delete - another snapshot is currently being deleted");
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |                  RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
 | 
	
		
			
				|  |  |                  if (restoreInProgress != null) {
 | 
	
	
		
			
				|  | @@ -1236,7 +1257,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |                                                  listener, true);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |                                          } catch (Exception ex) {
 | 
	
		
			
				|  |  | -                                            logger.warn(() -> new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex);
 | 
	
		
			
				|  |  | +                                            logger.warn(() ->
 | 
	
		
			
				|  |  | +                                                new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex);
 | 
	
		
			
				|  |  |                                          }
 | 
	
		
			
				|  |  |                                      }
 | 
	
		
			
				|  |  |                                  );
 | 
	
	
		
			
				|  | @@ -1384,7 +1406,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |              IndexMetaData indexMetaData = metaData.index(indexName);
 | 
	
		
			
				|  |  |              if (indexMetaData == null) {
 | 
	
		
			
				|  |  |                  // The index was deleted before we managed to start the snapshot - mark it as missing.
 | 
	
		
			
				|  |  | -                builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index"));
 | 
	
		
			
				|  |  | +                builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0),
 | 
	
		
			
				|  |  | +                    new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index"));
 | 
	
		
			
				|  |  |              } else if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
 | 
	
		
			
				|  |  |                  for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
 | 
	
		
			
				|  |  |                      ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
 | 
	
	
		
			
				|  | @@ -1397,17 +1420,22 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 | 
	
		
			
				|  |  |                      if (indexRoutingTable != null) {
 | 
	
		
			
				|  |  |                          ShardRouting primary = indexRoutingTable.shard(i).primaryShard();
 | 
	
		
			
				|  |  |                          if (primary == null || !primary.assignedToNode()) {
 | 
	
		
			
				|  |  | -                            builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated"));
 | 
	
		
			
				|  |  | +                            builder.put(shardId,
 | 
	
		
			
				|  |  | +                                new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated"));
 | 
	
		
			
				|  |  |                          } else if (primary.relocating() || primary.initializing()) {
 | 
	
		
			
				|  |  | -                            // The WAITING state was introduced in V1.2.0 - don't use it if there are nodes with older version in the cluster
 | 
	
		
			
				|  |  | +                            // The WAITING state was introduced in V1.2.0 -
 | 
	
		
			
				|  |  | +                            // don't use it if there are nodes with older version in the cluster
 | 
	
		
			
				|  |  |                              builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.WAITING));
 | 
	
		
			
				|  |  |                          } else if (!primary.started()) {
 | 
	
		
			
				|  |  | -                            builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING, "primary shard hasn't been started yet"));
 | 
	
		
			
				|  |  | +                            builder.put(shardId,
 | 
	
		
			
				|  |  | +                                new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING,
 | 
	
		
			
				|  |  | +                                    "primary shard hasn't been started yet"));
 | 
	
		
			
				|  |  |                          } else {
 | 
	
		
			
				|  |  |                              builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId()));
 | 
	
		
			
				|  |  |                          }
 | 
	
		
			
				|  |  |                      } else {
 | 
	
		
			
				|  |  | -                        builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing routing table"));
 | 
	
		
			
				|  |  | +                        builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING,
 | 
	
		
			
				|  |  | +                            "missing routing table"));
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |              }
 |