|
@@ -39,6 +39,7 @@ import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.ActionRunnable;
|
|
|
import org.elasticsearch.action.StepListener;
|
|
|
import org.elasticsearch.action.support.GroupedActionListener;
|
|
|
+import org.elasticsearch.action.support.PlainListenableActionFuture;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|
|
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
|
|
@@ -1358,8 +1359,90 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
- // Slow path if we were not able to safely read the repository data from cache
|
|
|
- threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
|
|
|
+ if (metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN && isReadOnly() == false) {
|
|
|
+ logger.debug("[{}] loading repository metadata for the first time, trying to determine correct generation and to store " +
|
|
|
+ "it in the cluster state", metadata.name());
|
|
|
+ initializeRepoGenerationTracking(listener);
|
|
|
+ } else {
|
|
|
+ logger.trace("[{}] loading un-cached repository data with best known repository generation [{}]", metadata.name(),
|
|
|
+ latestKnownRepoGen);
|
|
|
+ threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Listener used to ensure that repository data is only initialized once in the cluster state by #initializeRepoGenerationTracking
|
|
|
+ private PlainListenableActionFuture<RepositoryData> repoDataInitialized;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Method used to set the current repository generation in the cluster state's {@link RepositoryMetadata} to the latest generation that
|
|
|
+ * can be physically found in the repository before passing the latest {@link RepositoryData} to the given listener.
|
|
|
+ * This ensures that operations using {@link #executeConsistentStateUpdate} right after mounting a fresh repository will have a
|
|
|
+ * consistent view of the {@link RepositoryData} before any data has been written to the repository.
|
|
|
+ *
|
|
|
+ * @param listener listener to resolve with new repository data
|
|
|
+ */
|
|
|
+ private void initializeRepoGenerationTracking(ActionListener<RepositoryData> listener) {
|
|
|
+ synchronized (this) {
|
|
|
+ if (repoDataInitialized == null) {
|
|
|
+ logger.trace("[{}] initializing repository generation in cluster state", metadata.name());
|
|
|
+ repoDataInitialized = PlainListenableActionFuture.newListenableFuture();
|
|
|
+ repoDataInitialized.addListener(listener);
|
|
|
+ final Consumer<Exception> onFailure = e -> {
|
|
|
+ logger.warn(new ParameterizedMessage("[{}] Exception when initializing repository generation in cluster state",
|
|
|
+ metadata.name()), e);
|
|
|
+ final ActionListener<RepositoryData> existingListener;
|
|
|
+ synchronized (BlobStoreRepository.this) {
|
|
|
+ existingListener = repoDataInitialized;
|
|
|
+ repoDataInitialized = null;
|
|
|
+ }
|
|
|
+ existingListener.onFailure(e);
|
|
|
+ };
|
|
|
+ threadPool.generic().execute(() -> doGetRepositoryData(
|
|
|
+ ActionListener.wrap(repoData -> clusterService.submitStateUpdateTask(
|
|
|
+ "set safe repository generation [" + metadata.name() + "][" + repoData + "]",
|
|
|
+ new ClusterStateUpdateTask() {
|
|
|
+ @Override
|
|
|
+ public ClusterState execute(ClusterState currentState) {
|
|
|
+ RepositoryMetadata metadata = getRepoMetadata(currentState);
|
|
|
+ // No update to the repository generation should have occurred concurrently in general except for
|
|
|
+ // extreme corner cases like failing over to an older version master node and back to the current
|
|
|
+ // node concurrently
|
|
|
+ if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
|
|
|
+ throw new RepositoryException(
|
|
|
+ metadata.name(), "Found unexpected initialized repo metadata [" + metadata + "]");
|
|
|
+ }
|
|
|
+ return ClusterState.builder(currentState)
|
|
|
+ .metadata(Metadata.builder(currentState.getMetadata()).putCustom(
|
|
|
+ RepositoriesMetadata.TYPE,
|
|
|
+ currentState.metadata().<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
|
|
|
+ .withUpdatedGeneration(metadata.name(),
|
|
|
+ repoData.getGenId(), repoData.getGenId()))).build();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(String source, Exception e) {
|
|
|
+ onFailure.accept(e);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
|
|
+ // Resolve listeners on generic pool since some callbacks for repository data do additional IO
|
|
|
+ threadPool.generic().execute(() -> {
|
|
|
+ final ActionListener<RepositoryData> existingListener;
|
|
|
+ synchronized (BlobStoreRepository.this) {
|
|
|
+ existingListener = repoDataInitialized;
|
|
|
+ repoDataInitialized = null;
|
|
|
+ }
|
|
|
+ existingListener.onResponse(repoData);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }), onFailure)));
|
|
|
+ } else {
|
|
|
+ logger.trace("[{}] waiting for existing initialization of repository metadata generation in cluster state",
|
|
|
+ metadata.name());
|
|
|
+ repoDataInitialized.addListener(listener);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
|