|  | @@ -25,7 +25,7 @@ import org.elasticsearch.ExceptionsHelper;
 | 
	
		
			
				|  |  |  import org.elasticsearch.Version;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.ActionListener;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.ActionRunnable;
 | 
	
		
			
				|  |  | -import org.elasticsearch.action.ResultDeduplicator;
 | 
	
		
			
				|  |  | +import org.elasticsearch.action.SingleResultDeduplicator;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.StepListener;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.support.GroupedActionListener;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.support.ListenableActionFuture;
 | 
	
	
		
			
				|  | @@ -413,7 +413,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 | 
	
		
			
				|  |  |          this.namedXContentRegistry = namedXContentRegistry;
 | 
	
		
			
				|  |  |          this.basePath = basePath;
 | 
	
		
			
				|  |  |          this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings());
 | 
	
		
			
				|  |  | -        this.repoDataDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
 | 
	
		
			
				|  |  | +        this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>(
 | 
	
		
			
				|  |  | +            threadPool.getThreadContext(),
 | 
	
		
			
				|  |  | +            listener -> threadPool.executor(ThreadPool.Names.SNAPSHOT_META)
 | 
	
		
			
				|  |  | +                .execute(ActionRunnable.wrap(listener, this::doGetRepositoryData))
 | 
	
		
			
				|  |  | +        );
 | 
	
		
			
				|  |  |          shardSnapshotTaskRunner = new ShardSnapshotTaskRunner(
 | 
	
		
			
				|  |  |              threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
 | 
	
		
			
				|  |  |              threadPool.executor(ThreadPool.Names.SNAPSHOT),
 | 
	
	
		
			
				|  | @@ -1787,19 +1791,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 | 
	
		
			
				|  |  |                  metadata.name(),
 | 
	
		
			
				|  |  |                  latestKnownRepoGen
 | 
	
		
			
				|  |  |              );
 | 
	
		
			
				|  |  | -            // Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state
 | 
	
		
			
				|  |  | -            // Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given
 | 
	
		
			
				|  |  | -            // generation may change
 | 
	
		
			
				|  |  | -            final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META);
 | 
	
		
			
				|  |  | -            if (bestEffortConsistency || cacheRepositoryData == false) {
 | 
	
		
			
				|  |  | -                executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
 | 
	
		
			
				|  |  | -            } else {
 | 
	
		
			
				|  |  | -                repoDataDeduplicator.executeOnce(
 | 
	
		
			
				|  |  | -                    metadata,
 | 
	
		
			
				|  |  | -                    listener,
 | 
	
		
			
				|  |  | -                    (metadata, l) -> executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData))
 | 
	
		
			
				|  |  | -                );
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | +            repoDataLoadDeduplicator.execute(listener);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -1843,78 +1835,70 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  |                      existingListener.onFailure(e);
 | 
	
		
			
				|  |  |                  };
 | 
	
		
			
				|  |  | -                threadPool.generic()
 | 
	
		
			
				|  |  | -                    .execute(
 | 
	
		
			
				|  |  | -                        ActionRunnable.wrap(
 | 
	
		
			
				|  |  | -                            ActionListener.wrap(
 | 
	
		
			
				|  |  | -                                repoData -> submitUnbatchedTask(
 | 
	
		
			
				|  |  | -                                    "set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
 | 
	
		
			
				|  |  | -                                    new ClusterStateUpdateTask() {
 | 
	
		
			
				|  |  | -                                        @Override
 | 
	
		
			
				|  |  | -                                        public ClusterState execute(ClusterState currentState) {
 | 
	
		
			
				|  |  | -                                            RepositoryMetadata metadata = getRepoMetadata(currentState);
 | 
	
		
			
				|  |  | -                                            // No update to the repository generation should have occurred concurrently in general except
 | 
	
		
			
				|  |  | -                                            // for
 | 
	
		
			
				|  |  | -                                            // extreme corner cases like failing over to an older version master node and back to the
 | 
	
		
			
				|  |  | -                                            // current
 | 
	
		
			
				|  |  | -                                            // node concurrently
 | 
	
		
			
				|  |  | -                                            if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
 | 
	
		
			
				|  |  | -                                                throw new RepositoryException(
 | 
	
		
			
				|  |  | -                                                    metadata.name(),
 | 
	
		
			
				|  |  | -                                                    "Found unexpected initialized repo metadata [" + metadata + "]"
 | 
	
		
			
				|  |  | -                                                );
 | 
	
		
			
				|  |  | -                                            }
 | 
	
		
			
				|  |  | -                                            return ClusterState.builder(currentState)
 | 
	
		
			
				|  |  | -                                                .metadata(
 | 
	
		
			
				|  |  | -                                                    Metadata.builder(currentState.getMetadata())
 | 
	
		
			
				|  |  | -                                                        .putCustom(
 | 
	
		
			
				|  |  | -                                                            RepositoriesMetadata.TYPE,
 | 
	
		
			
				|  |  | -                                                            currentState.metadata()
 | 
	
		
			
				|  |  | -                                                                .<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
 | 
	
		
			
				|  |  | -                                                                .withUpdatedGeneration(
 | 
	
		
			
				|  |  | -                                                                    metadata.name(),
 | 
	
		
			
				|  |  | -                                                                    repoData.getGenId(),
 | 
	
		
			
				|  |  | -                                                                    repoData.getGenId()
 | 
	
		
			
				|  |  | -                                                                )
 | 
	
		
			
				|  |  | -                                                        )
 | 
	
		
			
				|  |  | +                repoDataLoadDeduplicator.execute(
 | 
	
		
			
				|  |  | +                    ActionListener.wrap(
 | 
	
		
			
				|  |  | +                        repoData -> submitUnbatchedTask(
 | 
	
		
			
				|  |  | +                            "set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
 | 
	
		
			
				|  |  | +                            new ClusterStateUpdateTask() {
 | 
	
		
			
				|  |  | +                                @Override
 | 
	
		
			
				|  |  | +                                public ClusterState execute(ClusterState currentState) {
 | 
	
		
			
				|  |  | +                                    RepositoryMetadata metadata = getRepoMetadata(currentState);
 | 
	
		
			
				|  |  | +                                    // No update to the repository generation should have occurred concurrently in general except
 | 
	
		
			
				|  |  | +                                    // for
 | 
	
		
			
				|  |  | +                                    // extreme corner cases like failing over to an older version master node and back to the
 | 
	
		
			
				|  |  | +                                    // current
 | 
	
		
			
				|  |  | +                                    // node concurrently
 | 
	
		
			
				|  |  | +                                    if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
 | 
	
		
			
				|  |  | +                                        throw new RepositoryException(
 | 
	
		
			
				|  |  | +                                            metadata.name(),
 | 
	
		
			
				|  |  | +                                            "Found unexpected initialized repo metadata [" + metadata + "]"
 | 
	
		
			
				|  |  | +                                        );
 | 
	
		
			
				|  |  | +                                    }
 | 
	
		
			
				|  |  | +                                    return ClusterState.builder(currentState)
 | 
	
		
			
				|  |  | +                                        .metadata(
 | 
	
		
			
				|  |  | +                                            Metadata.builder(currentState.getMetadata())
 | 
	
		
			
				|  |  | +                                                .putCustom(
 | 
	
		
			
				|  |  | +                                                    RepositoriesMetadata.TYPE,
 | 
	
		
			
				|  |  | +                                                    currentState.metadata()
 | 
	
		
			
				|  |  | +                                                        .<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
 | 
	
		
			
				|  |  | +                                                        .withUpdatedGeneration(metadata.name(), repoData.getGenId(), repoData.getGenId())
 | 
	
		
			
				|  |  |                                                  )
 | 
	
		
			
				|  |  | -                                                .build();
 | 
	
		
			
				|  |  | -                                        }
 | 
	
		
			
				|  |  | +                                        )
 | 
	
		
			
				|  |  | +                                        .build();
 | 
	
		
			
				|  |  | +                                }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                                        @Override
 | 
	
		
			
				|  |  | -                                        public void onFailure(Exception e) {
 | 
	
		
			
				|  |  | -                                            onFailure.accept(e);
 | 
	
		
			
				|  |  | -                                        }
 | 
	
		
			
				|  |  | +                                @Override
 | 
	
		
			
				|  |  | +                                public void onFailure(Exception e) {
 | 
	
		
			
				|  |  | +                                    onFailure.accept(e);
 | 
	
		
			
				|  |  | +                                }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                                        @Override
 | 
	
		
			
				|  |  | -                                        public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
 | 
	
		
			
				|  |  | -                                            logger.trace(
 | 
	
		
			
				|  |  | -                                                "[{}] initialized repository generation in cluster state to [{}]",
 | 
	
		
			
				|  |  | -                                                metadata.name(),
 | 
	
		
			
				|  |  | -                                                repoData.getGenId()
 | 
	
		
			
				|  |  | -                                            );
 | 
	
		
			
				|  |  | -                                            // Resolve listeners on generic pool since some callbacks for repository data do additional IO
 | 
	
		
			
				|  |  | -                                            threadPool.generic().execute(() -> {
 | 
	
		
			
				|  |  | -                                                final ActionListener<RepositoryData> existingListener;
 | 
	
		
			
				|  |  | -                                                synchronized (BlobStoreRepository.this) {
 | 
	
		
			
				|  |  | -                                                    existingListener = repoDataInitialized;
 | 
	
		
			
				|  |  | -                                                    repoDataInitialized = null;
 | 
	
		
			
				|  |  | -                                                }
 | 
	
		
			
				|  |  | -                                                existingListener.onResponse(repoData);
 | 
	
		
			
				|  |  | -                                                logger.trace(
 | 
	
		
			
				|  |  | -                                                    "[{}] called listeners after initializing repository to generation [{}]",
 | 
	
		
			
				|  |  | -                                                    metadata.name(),
 | 
	
		
			
				|  |  | -                                                    repoData.getGenId()
 | 
	
		
			
				|  |  | -                                                );
 | 
	
		
			
				|  |  | -                                            });
 | 
	
		
			
				|  |  | +                                @Override
 | 
	
		
			
				|  |  | +                                public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
 | 
	
		
			
				|  |  | +                                    logger.trace(
 | 
	
		
			
				|  |  | +                                        "[{}] initialized repository generation in cluster state to [{}]",
 | 
	
		
			
				|  |  | +                                        metadata.name(),
 | 
	
		
			
				|  |  | +                                        repoData.getGenId()
 | 
	
		
			
				|  |  | +                                    );
 | 
	
		
			
				|  |  | +                                    // Resolve listeners on generic pool since some callbacks for repository data do additional IO
 | 
	
		
			
				|  |  | +                                    threadPool.generic().execute(() -> {
 | 
	
		
			
				|  |  | +                                        final ActionListener<RepositoryData> existingListener;
 | 
	
		
			
				|  |  | +                                        synchronized (BlobStoreRepository.this) {
 | 
	
		
			
				|  |  | +                                            existingListener = repoDataInitialized;
 | 
	
		
			
				|  |  | +                                            repoDataInitialized = null;
 | 
	
		
			
				|  |  |                                          }
 | 
	
		
			
				|  |  | -                                    }
 | 
	
		
			
				|  |  | -                                ),
 | 
	
		
			
				|  |  | -                                onFailure
 | 
	
		
			
				|  |  | -                            ),
 | 
	
		
			
				|  |  | -                            this::doGetRepositoryData
 | 
	
		
			
				|  |  | -                        )
 | 
	
		
			
				|  |  | -                    );
 | 
	
		
			
				|  |  | +                                        existingListener.onResponse(repoData);
 | 
	
		
			
				|  |  | +                                        logger.trace(
 | 
	
		
			
				|  |  | +                                            "[{}] called listeners after initializing repository to generation [{}]",
 | 
	
		
			
				|  |  | +                                            metadata.name(),
 | 
	
		
			
				|  |  | +                                            repoData.getGenId()
 | 
	
		
			
				|  |  | +                                        );
 | 
	
		
			
				|  |  | +                                    });
 | 
	
		
			
				|  |  | +                                }
 | 
	
		
			
				|  |  | +                            }
 | 
	
		
			
				|  |  | +                        ),
 | 
	
		
			
				|  |  | +                        onFailure
 | 
	
		
			
				|  |  | +                    )
 | 
	
		
			
				|  |  | +                );
 | 
	
		
			
				|  |  |              } else {
 | 
	
		
			
				|  |  |                  logger.trace(
 | 
	
		
			
				|  |  |                      "[{}] waiting for existing initialization of repository metadata generation in cluster state",
 | 
	
	
		
			
				|  | @@ -1926,11 +1910,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  | -     * {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning
 | 
	
		
			
				|  |  | -     * {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
 | 
	
		
			
				|  |  | -     * unique for a given value of {@link #metadata} at any point in time.
 | 
	
		
			
				|  |  | +     * Deduplicator that deduplicates the physical loading of {@link RepositoryData} from the repositories' underlying storage.
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  | -    private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator;
 | 
	
		
			
				|  |  | +    private final SingleResultDeduplicator<RepositoryData> repoDataLoadDeduplicator;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
 | 
	
		
			
				|  |  |          // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
 |