|
@@ -418,6 +418,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
}
|
|
|
final var survivingIndices = updatedRepositoryData.getIndices();
|
|
|
deleteIndices(
|
|
|
+ updatedRepositoryData,
|
|
|
Optional.ofNullable(finalSnapshotInfo)
|
|
|
.map(info -> info.indices().stream().filter(survivingIndices::containsKey)
|
|
|
.map(updatedRepositoryData::resolveIndexId).collect(Collectors.toList()))
|
|
@@ -502,7 +503,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void deleteIndices(List<IndexId> indices, SnapshotId snapshotId, ActionListener<Void> listener) {
|
|
|
+ private void deleteIndices(RepositoryData repositoryData, List<IndexId> indices, SnapshotId snapshotId,
|
|
|
+ ActionListener<Void> listener) {
|
|
|
if (indices.isEmpty()) {
|
|
|
listener.onResponse(null);
|
|
|
return;
|
|
@@ -524,7 +526,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
if (indexMetaData != null) {
|
|
|
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
|
|
|
try {
|
|
|
- deleteShardSnapshot(indexId, new ShardId(indexMetaData.getIndex(), shardId), snapshotId);
|
|
|
+ deleteShardSnapshot(repositoryData, indexId, new ShardId(indexMetaData.getIndex(), shardId), snapshotId);
|
|
|
} catch (SnapshotException ex) {
|
|
|
final int finalShardId = shardId;
|
|
|
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
|
|
@@ -951,9 +953,29 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
for (SnapshotFiles point : snapshots) {
|
|
|
newSnapshotsList.add(point);
|
|
|
}
|
|
|
- // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
|
|
|
- finalizeShard(newSnapshotsList, fileListGeneration, blobs, "snapshot creation [" + snapshotId + "]", shardContainer,
|
|
|
- shardId, snapshotId);
|
|
|
+ final String indexGeneration = Long.toString(fileListGeneration + 1);
|
|
|
+ final List<String> blobsToDelete;
|
|
|
+ try {
|
|
|
+ final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
|
|
|
+ indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
|
|
|
+ // Delete all previous index-N blobs
|
|
|
+ blobsToDelete =
|
|
|
+ blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
|
|
|
+ assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, "")))
|
|
|
+ .max().orElse(-1L) < Long.parseLong(indexGeneration)
|
|
|
+ : "Tried to delete an index-N blob newer than the current generation [" + indexGeneration + "] when deleting index-N" +
|
|
|
+ " blobs " + blobsToDelete;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new IndexShardSnapshotFailedException(shardId,
|
|
|
+ "Failed to finalize snapshot creation [" + snapshotId + "] with shard index ["
|
|
|
+ + indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
|
|
|
+ } catch (IOException e) {
|
|
|
+ logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization",
|
|
|
+ snapshotId, shardId), e);
|
|
|
+ }
|
|
|
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis());
|
|
|
} catch (Exception e) {
|
|
|
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e));
|
|
@@ -1041,7 +1063,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
/**
|
|
|
* Delete shard snapshot
|
|
|
*/
|
|
|
- private void deleteShardSnapshot(IndexId indexId, ShardId snapshotShardId, SnapshotId snapshotId) {
|
|
|
+ private void deleteShardSnapshot(RepositoryData repositoryData, IndexId indexId, ShardId snapshotShardId, SnapshotId snapshotId) {
|
|
|
final BlobContainer shardContainer = shardContainer(indexId, snapshotShardId);
|
|
|
final Map<String, BlobMetaData> blobs;
|
|
|
try {
|
|
@@ -1056,72 +1078,55 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
|
|
|
// Build a list of snapshots that should be preserved
|
|
|
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
|
|
|
+ final Set<String> survivingSnapshotNames =
|
|
|
+ repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getName).collect(Collectors.toSet());
|
|
|
for (SnapshotFiles point : snapshots) {
|
|
|
- if (!point.snapshot().equals(snapshotId.getName())) {
|
|
|
+ if (survivingSnapshotNames.contains(point.snapshot())) {
|
|
|
newSnapshotsList.add(point);
|
|
|
}
|
|
|
}
|
|
|
- // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
|
|
|
- finalizeShard(newSnapshotsList, fileListGeneration, blobs, "snapshot deletion [" + snapshotId + "]", shardContainer,
|
|
|
- snapshotShardId, snapshotId);
|
|
|
-
|
|
|
- try {
|
|
|
- shardContainer.deleteBlobIgnoringIfNotExists(indexShardSnapshotFormat.blobName(snapshotId.getUUID()));
|
|
|
- } catch (IOException e) {
|
|
|
- logger.warn(new ParameterizedMessage("[{}] [{}] failed to delete shard snapshot file", snapshotShardId, snapshotId), e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Loads information about shard snapshot
|
|
|
- */
|
|
|
- private BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
|
|
|
- try {
|
|
|
- return indexShardSnapshotFormat.read(shardContainer, snapshotId.getUUID());
|
|
|
- } catch (IOException ex) {
|
|
|
- throw new SnapshotException(metadata.name(), snapshotId,
|
|
|
- "failed to read shard snapshot file for [" + shardContainer.path() + ']', ex);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Writes a new index file for the shard and removes all unreferenced files from the repository.
|
|
|
- *
|
|
|
- * We need to be really careful in handling index files in case of failures to make sure we don't
|
|
|
- * have index file that points to files that were deleted.
|
|
|
- *
|
|
|
- * @param snapshots list of active snapshots in the container
|
|
|
- * @param fileListGeneration the generation number of the current snapshot index file
|
|
|
- * @param blobs list of blobs in the container
|
|
|
- * @param reason a reason explaining why the shard index file is written
|
|
|
- */
|
|
|
- private void finalizeShard(List<SnapshotFiles> snapshots, long fileListGeneration, Map<String, BlobMetaData> blobs,
|
|
|
- String reason, BlobContainer shardContainer, ShardId shardId, SnapshotId snapshotId) {
|
|
|
final String indexGeneration = Long.toString(fileListGeneration + 1);
|
|
|
try {
|
|
|
final List<String> blobsToDelete;
|
|
|
- if (snapshots.isEmpty()) {
|
|
|
+ if (newSnapshotsList.isEmpty()) {
|
|
|
// If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found
|
|
|
blobsToDelete = List.copyOf(blobs.keySet());
|
|
|
} else {
|
|
|
- final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
|
|
|
+ final Set<String> survivingSnapshotUUIDs = repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getUUID)
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+ final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
|
|
|
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
|
|
|
- // Delete all previous index-N, data-blobs that are not referenced by the new index-N and temporary blobs
|
|
|
+ // Delete all previous index-N, data- and meta-blobs and that are not referenced by the new index-N and temporary blobs
|
|
|
blobsToDelete = blobs.keySet().stream().filter(blob ->
|
|
|
blob.startsWith(SNAPSHOT_INDEX_PREFIX)
|
|
|
- || blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null
|
|
|
+ || (blob.startsWith(SNAPSHOT_PREFIX) && blob.endsWith(".dat")
|
|
|
+ && survivingSnapshotUUIDs.contains(
|
|
|
+ blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())) == false)
|
|
|
+ || (blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
|
|
|
|| FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
|
|
|
}
|
|
|
try {
|
|
|
shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
|
|
|
} catch (IOException e) {
|
|
|
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization",
|
|
|
- snapshotId, shardId), e);
|
|
|
+ snapshotId, snapshotShardId), e);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
- String message =
|
|
|
- "Failed to finalize " + reason + " with shard index [" + indexShardSnapshotsFormat.blobName(indexGeneration) + "]";
|
|
|
- throw new IndexShardSnapshotFailedException(shardId, message, e);
|
|
|
+ throw new IndexShardSnapshotFailedException(snapshotShardId,
|
|
|
+ "Failed to finalize snapshot deletion [" + snapshotId + "] with shard index ["
|
|
|
+ + indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Loads information about shard snapshot
|
|
|
+ */
|
|
|
+ private BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
|
|
|
+ try {
|
|
|
+ return indexShardSnapshotFormat.read(shardContainer, snapshotId.getUUID());
|
|
|
+ } catch (IOException ex) {
|
|
|
+ throw new SnapshotException(metadata.name(), snapshotId,
|
|
|
+ "failed to read shard snapshot file for [" + shardContainer.path() + ']', ex);
|
|
|
}
|
|
|
}
|
|
|
|