|
@@ -79,6 +79,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
|
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
|
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
|
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
@@ -99,6 +100,7 @@ import org.elasticsearch.index.store.Store;
|
|
|
import org.elasticsearch.index.store.StoreFileMetadata;
|
|
|
import org.elasticsearch.indices.recovery.RecoveryState;
|
|
|
import org.elasticsearch.repositories.IndexId;
|
|
|
+import org.elasticsearch.repositories.IndexMetaDataGenerations;
|
|
|
import org.elasticsearch.repositories.Repository;
|
|
|
import org.elasticsearch.repositories.RepositoryCleanupResult;
|
|
|
import org.elasticsearch.repositories.RepositoryData;
|
|
@@ -552,7 +554,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
// delete an index that was created by another master node after writing this index-N blob.
|
|
|
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
|
|
|
doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData,
|
|
|
- SnapshotsService.useShardGenerations(repositoryMetaVersion), listener);
|
|
|
+ repositoryMetaVersion, listener);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -612,10 +614,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
* @param listener Listener to invoke once finished
|
|
|
*/
|
|
|
private void doDeleteShardSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Map<String, BlobContainer> foundIndices,
|
|
|
- Map<String, BlobMetadata> rootBlobs, RepositoryData repositoryData, boolean writeShardGens,
|
|
|
+ Map<String, BlobMetadata> rootBlobs, RepositoryData repositoryData, Version repoMetaVersion,
|
|
|
ActionListener<Void> listener) {
|
|
|
|
|
|
- if (writeShardGens) {
|
|
|
+ if (SnapshotsService.useShardGenerations(repoMetaVersion)) {
|
|
|
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
|
|
|
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep = new StepListener<>();
|
|
|
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep);
|
|
@@ -633,7 +635,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
|
|
|
}
|
|
|
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, builder.build());
|
|
|
- writeIndexGen(updatedRepoData, repositoryStateId, true, Function.identity(),
|
|
|
+ writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(),
|
|
|
ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure));
|
|
|
}, listener::onFailure);
|
|
|
// Once we have updated the repository, run the clean-ups
|
|
@@ -642,12 +644,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
final ActionListener<Void> afterCleanupsListener =
|
|
|
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
|
|
|
asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
|
|
|
- asyncCleanupUnlinkedShardLevelBlobs(snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), afterCleanupsListener);
|
|
|
+ asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(),
|
|
|
+ afterCleanupsListener);
|
|
|
}, listener::onFailure);
|
|
|
} else {
|
|
|
// Write the new repository data first (with the removed snapshot), using no shard generations
|
|
|
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY);
|
|
|
- writeIndexGen(updatedRepoData, repositoryStateId, false, Function.identity(), ActionListener.wrap(v -> {
|
|
|
+ writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(v -> {
|
|
|
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
|
|
|
final ActionListener<Void> afterCleanupsListener =
|
|
|
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
|
|
@@ -655,7 +658,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
|
|
|
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep);
|
|
|
writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
|
|
|
- asyncCleanupUnlinkedShardLevelBlobs(snapshotIds, deleteResults, afterCleanupsListener),
|
|
|
+ asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, deleteResults, afterCleanupsListener),
|
|
|
afterCleanupsListener::onFailure);
|
|
|
}, listener::onFailure));
|
|
|
}
|
|
@@ -669,14 +672,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
l -> cleanupStaleBlobs(deletedSnapshots, foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null))));
|
|
|
}
|
|
|
|
|
|
- private void asyncCleanupUnlinkedShardLevelBlobs(Collection<SnapshotId> snapshotIds,
|
|
|
+ private void asyncCleanupUnlinkedShardLevelBlobs(RepositoryData oldRepositoryData, Collection<SnapshotId> snapshotIds,
|
|
|
Collection<ShardSnapshotMetaDeleteResult> deleteResults,
|
|
|
ActionListener<Void> listener) {
|
|
|
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
|
|
|
listener,
|
|
|
l -> {
|
|
|
try {
|
|
|
- deleteFromContainer(blobContainer(), resolveFilesToDelete(snapshotIds, deleteResults));
|
|
|
+ deleteFromContainer(blobContainer(), resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults));
|
|
|
l.onResponse(null);
|
|
|
} catch (Exception e) {
|
|
|
logger.warn(
|
|
@@ -708,14 +711,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
final Set<SnapshotId> survivingSnapshots = oldRepositoryData.getSnapshots(indexId).stream()
|
|
|
.filter(id -> snapshotIds.contains(id) == false).collect(Collectors.toSet());
|
|
|
final StepListener<Collection<Integer>> shardCountListener = new StepListener<>();
|
|
|
- final ActionListener<Integer> allShardCountsListener = new GroupedActionListener<>(shardCountListener, snapshotIds.size());
|
|
|
- for (SnapshotId snapshotId : snapshotIds) {
|
|
|
+ final Collection<String> indexMetaGenerations = snapshotIds.stream().map(
|
|
|
+ id -> oldRepositoryData.indexMetaDataGenerations().indexMetaBlobId(id, indexId)).collect(Collectors.toSet());
|
|
|
+ final ActionListener<Integer> allShardCountsListener =
|
|
|
+ new GroupedActionListener<>(shardCountListener, indexMetaGenerations.size());
|
|
|
+ final BlobContainer indexContainer = indexContainer(indexId);
|
|
|
+ for (String indexMetaGeneration : indexMetaGenerations) {
|
|
|
executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
|
|
|
try {
|
|
|
- return getSnapshotIndexMetadata(snapshotId, indexId).getNumberOfShards();
|
|
|
+ return indexMetadataFormat.read(indexContainer, indexMetaGeneration).getNumberOfShards();
|
|
|
} catch (Exception ex) {
|
|
|
logger.warn(() -> new ParameterizedMessage(
|
|
|
- "[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex);
|
|
|
+ "[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
|
|
|
// Just invoke the listener without any shard generations to count it down, this index will be cleaned up
|
|
|
// by the stale data cleanup in the end.
|
|
|
// TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just
|
|
@@ -770,20 +777,22 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private List<String> resolveFilesToDelete(Collection<SnapshotId> snapshotIds,
|
|
|
+ private List<String> resolveFilesToDelete(RepositoryData oldRepositoryData, Collection<SnapshotId> snapshotIds,
|
|
|
Collection<ShardSnapshotMetaDeleteResult> deleteResults) {
|
|
|
final String basePath = basePath().buildAsString();
|
|
|
final int basePathLen = basePath.length();
|
|
|
+ final Map<IndexId, Collection<String>> indexMetaGenerations =
|
|
|
+ oldRepositoryData.indexMetaDataToRemoveAfterRemovingSnapshots(snapshotIds);
|
|
|
return Stream.concat(
|
|
|
- deleteResults.stream().flatMap(shardResult -> {
|
|
|
- final String shardPath =
|
|
|
- shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString();
|
|
|
- return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
|
|
|
- }),
|
|
|
- deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().flatMap(indexId -> {
|
|
|
- final String indexContainerPath = indexContainer(indexId).path().buildAsString();
|
|
|
- return snapshotIds.stream().map(snapshotId -> indexContainerPath + globalMetadataFormat.blobName(snapshotId.getUUID()));
|
|
|
- })
|
|
|
+ deleteResults.stream().flatMap(shardResult -> {
|
|
|
+ final String shardPath =
|
|
|
+ shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString();
|
|
|
+ return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
|
|
|
+ }),
|
|
|
+ indexMetaGenerations.entrySet().stream().flatMap(entry -> {
|
|
|
+ final String indexContainerPath = indexContainer(entry.getKey()).path().buildAsString();
|
|
|
+ return entry.getValue().stream().map(id -> indexContainerPath + indexMetadataFormat.blobName(id));
|
|
|
+ })
|
|
|
).map(absolutePath -> {
|
|
|
assert absolutePath.startsWith(basePath);
|
|
|
return absolutePath.substring(basePathLen);
|
|
@@ -827,6 +836,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
* Runs cleanup actions on the repository. Increments the repository state id by one before executing any modifications on the
|
|
|
* repository.
|
|
|
* TODO: Add shard level cleanups
|
|
|
+ * TODO: Add unreferenced index metadata cleanup
|
|
|
* <ul>
|
|
|
* <li>Deleting stale indices {@link #cleanupStaleIndices}</li>
|
|
|
* <li>Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}</li>
|
|
@@ -851,7 +861,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO));
|
|
|
} else {
|
|
|
// write new index-N blob to ensure concurrent operations will fail
|
|
|
- writeIndexGen(repositoryData, repositoryStateId, SnapshotsService.useShardGenerations(repositoryMetaVersion),
|
|
|
+ writeIndexGen(repositoryData, repositoryStateId, repositoryMetaVersion,
|
|
|
Function.identity(), ActionListener.wrap(v -> cleanupStaleBlobs(Collections.emptyList(), foundIndices, rootBlobs,
|
|
|
repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure));
|
|
|
}
|
|
@@ -975,48 +985,81 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
|
|
|
final Consumer<Exception> onUpdateFailure =
|
|
|
e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e));
|
|
|
- final ActionListener<SnapshotInfo> allMetaListener = new GroupedActionListener<>(
|
|
|
- ActionListener.wrap(snapshotInfos -> {
|
|
|
- assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
|
|
|
- final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
|
|
|
- getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
|
|
|
- final RepositoryData updatedRepositoryData =
|
|
|
- existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations);
|
|
|
- writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, stateTransformer,
|
|
|
- ActionListener.wrap(writtenRepoData -> {
|
|
|
- if (writeShardGens) {
|
|
|
- cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
|
|
|
- }
|
|
|
- listener.onResponse(new Tuple<>(writtenRepoData, snapshotInfo));
|
|
|
- }, onUpdateFailure));
|
|
|
- }, onUpdateFailure));
|
|
|
- }, onUpdateFailure), 2 + indices.size());
|
|
|
+
|
|
|
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
|
|
|
|
|
|
- // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will
|
|
|
- // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
|
|
|
- // index or global metadata will be compatible with the segments written in this snapshot as well.
|
|
|
- // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
|
|
|
- // that decrements the generation it points at
|
|
|
+ final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion);
|
|
|
|
|
|
- // Write Global Metadata
|
|
|
- executor.execute(ActionRunnable.run(allMetaListener,
|
|
|
- () -> globalMetadataFormat.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), false)));
|
|
|
+ final StepListener<RepositoryData> repoDataListener = new StepListener<>();
|
|
|
+ executor.execute(ActionRunnable.wrap(repoDataListener, this::getRepositoryData));
|
|
|
+ repoDataListener.whenComplete(existingRepositoryData -> {
|
|
|
|
|
|
- // write the index metadata for each index in the snapshot
|
|
|
- for (IndexId index : indices) {
|
|
|
- executor.execute(ActionRunnable.run(allMetaListener, () ->
|
|
|
- indexMetadataFormat.write(clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false)));
|
|
|
- }
|
|
|
+ final Map<IndexId, String> indexMetas;
|
|
|
+ final Map<String, String> indexMetaIdentifiers;
|
|
|
+ if (writeIndexGens) {
|
|
|
+ indexMetaIdentifiers = ConcurrentCollections.newConcurrentMap();
|
|
|
+ indexMetas = ConcurrentCollections.newConcurrentMap();
|
|
|
+ } else {
|
|
|
+ indexMetas = null;
|
|
|
+ indexMetaIdentifiers = null;
|
|
|
+ }
|
|
|
|
|
|
- executor.execute(ActionRunnable.supply(allMetaListener, () -> {
|
|
|
- final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
|
|
|
- indices.stream().map(IndexId::getName).collect(Collectors.toList()),
|
|
|
- startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
|
|
|
- includeGlobalState, userMetadata);
|
|
|
- snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false);
|
|
|
- return snapshotInfo;
|
|
|
- }));
|
|
|
+ final ActionListener<SnapshotInfo> allMetaListener = new GroupedActionListener<>(
|
|
|
+ ActionListener.wrap(snapshotInfos -> {
|
|
|
+ assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
|
|
|
+ final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
|
|
|
+ final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot(
|
|
|
+ snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations, indexMetas, indexMetaIdentifiers);
|
|
|
+ writeIndexGen(updatedRepositoryData, repositoryStateId, repositoryMetaVersion, stateTransformer,
|
|
|
+ ActionListener.wrap(
|
|
|
+ newRepoData -> {
|
|
|
+ if (writeShardGens) {
|
|
|
+ cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
|
|
|
+ }
|
|
|
+ listener.onResponse(new Tuple<>(newRepoData, snapshotInfo));
|
|
|
+ }, onUpdateFailure));
|
|
|
+ }, onUpdateFailure), 2 + indices.size());
|
|
|
+
|
|
|
+ // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will
|
|
|
+ // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
|
|
|
+ // index or global metadata will be compatible with the segments written in this snapshot as well.
|
|
|
+ // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
|
|
|
+ // that decrements the generation it points at
|
|
|
+
|
|
|
+ // Write Global MetaData
|
|
|
+ executor.execute(ActionRunnable.run(allMetaListener,
|
|
|
+ () -> globalMetadataFormat.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), false)));
|
|
|
+
|
|
|
+ // write the index metadata for each index in the snapshot
|
|
|
+ for (IndexId index : indices) {
|
|
|
+ executor.execute(ActionRunnable.run(allMetaListener, () -> {
|
|
|
+ final IndexMetadata indexMetaData = clusterMetadata.index(index.getName());
|
|
|
+ if (writeIndexGens) {
|
|
|
+ final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
|
|
|
+ String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
|
|
|
+ if (metaUUID == null) {
|
|
|
+ // We don't yet have this version of the metadata so we write it
|
|
|
+ metaUUID = UUIDs.base64UUID();
|
|
|
+ indexMetadataFormat.write(indexMetaData, indexContainer(index), metaUUID, false);
|
|
|
+ indexMetaIdentifiers.put(identifiers, metaUUID);
|
|
|
+ }
|
|
|
+ indexMetas.put(index, identifiers);
|
|
|
+ } else {
|
|
|
+ indexMetadataFormat.write(
|
|
|
+ clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ executor.execute(ActionRunnable.supply(allMetaListener, () -> {
|
|
|
+ final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
|
|
|
+ indices.stream().map(IndexId::getName).collect(Collectors.toList()),
|
|
|
+ startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
|
|
|
+ includeGlobalState, userMetadata);
|
|
|
+ snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false);
|
|
|
+ return snapshotInfo;
|
|
|
+ }));
|
|
|
+ }, onUpdateFailure);
|
|
|
}
|
|
|
|
|
|
// Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data
|
|
@@ -1056,9 +1099,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public IndexMetadata getSnapshotIndexMetadata(final SnapshotId snapshotId, final IndexId index) throws IOException {
|
|
|
+ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
|
|
|
try {
|
|
|
- return indexMetadataFormat.read(indexContainer(index), snapshotId.getUUID());
|
|
|
+ return indexMetadataFormat.read(indexContainer(index),
|
|
|
+ repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index));
|
|
|
} catch (NoSuchFileException e) {
|
|
|
throw new SnapshotMissingException(metadata.name(), snapshotId, e);
|
|
|
}
|
|
@@ -1259,7 +1303,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
try {
|
|
|
try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out);
|
|
|
XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) {
|
|
|
- updated.snapshotsToXContent(builder, true);
|
|
|
+ updated.snapshotsToXContent(builder, Version.CURRENT);
|
|
|
}
|
|
|
serialized = out.bytes();
|
|
|
final int len = serialized.length();
|
|
@@ -1392,11 +1436,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
*
|
|
|
* @param repositoryData RepositoryData to write
|
|
|
* @param expectedGen expected repository generation at the start of the operation
|
|
|
- * @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob
|
|
|
+ * @param version version of the repository metadata to write
|
|
|
* @param stateFilter filter for the last cluster state update executed by this method
|
|
|
* @param listener completion listener
|
|
|
*/
|
|
|
- protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens,
|
|
|
+ protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, Version version,
|
|
|
Function<ClusterState, ClusterState> stateFilter, ActionListener<RepositoryData> listener) {
|
|
|
assert isReadOnly() == false; // can not write to a read only repository
|
|
|
final long currentGen = repositoryData.getGenId();
|
|
@@ -1504,7 +1548,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
|
|
|
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
|
|
|
writeAtomic(indexBlob,
|
|
|
- BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
|
|
|
+ BytesReference.bytes(
|
|
|
+ filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version)), true);
|
|
|
// write the current generation to the index-latest file
|
|
|
final BytesReference genBytes;
|
|
|
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
|