|
@@ -1872,63 +1872,78 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
|
|
|
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileSize);
|
|
|
|
|
|
- final StepListener<Collection<Void>> allFilesUploadedListener = new StepListener<>();
|
|
|
- allFilesUploadedListener.whenComplete(v -> {
|
|
|
- final IndexShardSnapshotStatus.Copy lastSnapshotStatus =
|
|
|
- snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration());
|
|
|
-
|
|
|
- // now create and write the commit point
|
|
|
- final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(),
|
|
|
- lastSnapshotStatus.getIndexVersion(),
|
|
|
- indexCommitPointFiles,
|
|
|
- lastSnapshotStatus.getStartTime(),
|
|
|
- threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
|
|
|
- lastSnapshotStatus.getIncrementalFileCount(),
|
|
|
- lastSnapshotStatus.getIncrementalSize()
|
|
|
- );
|
|
|
-
|
|
|
- logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
|
|
|
+ final String indexGeneration;
|
|
|
+ final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
|
|
|
+ // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
|
|
|
+ List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
|
|
|
+ newSnapshotsList.add(new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, shardStateIdentifier));
|
|
|
+ for (SnapshotFiles point : snapshots) {
|
|
|
+ newSnapshotsList.add(point);
|
|
|
+ }
|
|
|
+ final BlobStoreIndexShardSnapshots updatedBlobStoreIndexShardSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
|
|
|
+ final Runnable afterWriteSnapBlob;
|
|
|
+ if (writeShardGens) {
|
|
|
+ // When using shard generations we can safely write the index-${uuid} blob before writing out any of the actual data
|
|
|
+ // for this shard since the uuid named blob will simply not be referenced in case of error and thus we will never
|
|
|
+ // reference a generation that has not had all its files fully upload.
|
|
|
+ indexGeneration = UUIDs.randomBase64UUID();
|
|
|
try {
|
|
|
- INDEX_SHARD_SNAPSHOT_FORMAT.write(snapshot, shardContainer, snapshotId.getUUID(), compress);
|
|
|
+ writeShardIndexBlob(shardContainer, indexGeneration, updatedBlobStoreIndexShardSnapshots);
|
|
|
} catch (IOException e) {
|
|
|
- throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
|
|
|
- }
|
|
|
- // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
|
|
|
- List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
|
|
|
- newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), shardStateIdentifier));
|
|
|
- for (SnapshotFiles point : snapshots) {
|
|
|
- newSnapshotsList.add(point);
|
|
|
+ throw new IndexShardSnapshotFailedException(shardId,
|
|
|
+ "Failed to write shard level snapshot metadata for [" + snapshotId + "] to ["
|
|
|
+ + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) + "]", e);
|
|
|
}
|
|
|
- final List<String> blobsToDelete;
|
|
|
- final String indexGeneration;
|
|
|
- final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
|
|
|
- if (writeShardGens) {
|
|
|
- indexGeneration = UUIDs.randomBase64UUID();
|
|
|
- blobsToDelete = Collections.emptyList();
|
|
|
- } else {
|
|
|
- indexGeneration = Long.toString(Long.parseLong(fileListGeneration) + 1);
|
|
|
- // Delete all previous index-N blobs
|
|
|
- blobsToDelete = blobs.stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
|
|
|
- assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, "")))
|
|
|
+ afterWriteSnapBlob = () -> {};
|
|
|
+ } else {
|
|
|
+ // When not using shard generations we can only write the index-${N} blob after all other work for this shard has
|
|
|
+ // completed.
|
|
|
+ // Also, in case of numeric shard generations the data node has to take care of deleting old shard generations.
|
|
|
+ indexGeneration = Long.toString(Long.parseLong(fileListGeneration) + 1);
|
|
|
+ // Delete all previous index-N blobs
|
|
|
+ final List<String> blobsToDelete = blobs.stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, "")))
|
|
|
.max().orElse(-1L) < Long.parseLong(indexGeneration)
|
|
|
: "Tried to delete an index-N blob newer than the current generation [" + indexGeneration
|
|
|
+ "] when deleting index-N blobs " + blobsToDelete;
|
|
|
- }
|
|
|
- try {
|
|
|
- writeShardIndexBlob(shardContainer, indexGeneration, new BlobStoreIndexShardSnapshots(newSnapshotsList));
|
|
|
- } catch (IOException e) {
|
|
|
- throw new IndexShardSnapshotFailedException(shardId,
|
|
|
- "Failed to finalize snapshot creation [" + snapshotId + "] with shard index ["
|
|
|
- + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) + "]", e);
|
|
|
- }
|
|
|
- if (writeShardGens == false) {
|
|
|
+ afterWriteSnapBlob = () -> {
|
|
|
+ try {
|
|
|
+ writeShardIndexBlob(shardContainer, indexGeneration, updatedBlobStoreIndexShardSnapshots);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new IndexShardSnapshotFailedException(shardId,
|
|
|
+ "Failed to finalize snapshot creation [" + snapshotId + "] with shard index ["
|
|
|
+ + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) + "]", e);
|
|
|
+ }
|
|
|
try {
|
|
|
deleteFromContainer(shardContainer, blobsToDelete);
|
|
|
} catch (IOException e) {
|
|
|
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization",
|
|
|
- snapshotId, shardId), e);
|
|
|
+ snapshotId, shardId), e);
|
|
|
}
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ final StepListener<Collection<Void>> allFilesUploadedListener = new StepListener<>();
|
|
|
+ allFilesUploadedListener.whenComplete(v -> {
|
|
|
+ final IndexShardSnapshotStatus.Copy lastSnapshotStatus =
|
|
|
+ snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration());
|
|
|
+
|
|
|
+ // now create and write the commit point
|
|
|
+ logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
|
|
|
+ try {
|
|
|
+ INDEX_SHARD_SNAPSHOT_FORMAT.write(new BlobStoreIndexShardSnapshot(snapshotId.getName(),
|
|
|
+ lastSnapshotStatus.getIndexVersion(),
|
|
|
+ indexCommitPointFiles,
|
|
|
+ lastSnapshotStatus.getStartTime(),
|
|
|
+ threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
|
|
|
+ lastSnapshotStatus.getIncrementalFileCount(),
|
|
|
+ lastSnapshotStatus.getIncrementalSize()
|
|
|
+ ), shardContainer, snapshotId.getUUID(), compress);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
|
|
|
}
|
|
|
+ afterWriteSnapBlob.run();
|
|
|
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), indexGeneration);
|
|
|
listener.onResponse(indexGeneration);
|
|
|
}, listener::onFailure);
|