|
@@ -478,34 +478,15 @@ public class RecoverySourceHandler {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- static final class SendFileResult {
|
|
|
- final List<String> phase1FileNames;
|
|
|
- final List<Long> phase1FileSizes;
|
|
|
- final long totalSize;
|
|
|
-
|
|
|
- final List<String> phase1ExistingFileNames;
|
|
|
- final List<Long> phase1ExistingFileSizes;
|
|
|
- final long existingTotalSize;
|
|
|
-
|
|
|
- final TimeValue took;
|
|
|
-
|
|
|
- SendFileResult(
|
|
|
- List<String> phase1FileNames,
|
|
|
- List<Long> phase1FileSizes,
|
|
|
- long totalSize,
|
|
|
- List<String> phase1ExistingFileNames,
|
|
|
- List<Long> phase1ExistingFileSizes,
|
|
|
- long existingTotalSize,
|
|
|
- TimeValue took
|
|
|
- ) {
|
|
|
- this.phase1FileNames = phase1FileNames;
|
|
|
- this.phase1FileSizes = phase1FileSizes;
|
|
|
- this.totalSize = totalSize;
|
|
|
- this.phase1ExistingFileNames = phase1ExistingFileNames;
|
|
|
- this.phase1ExistingFileSizes = phase1ExistingFileSizes;
|
|
|
- this.existingTotalSize = existingTotalSize;
|
|
|
- this.took = took;
|
|
|
- }
|
|
|
+ record SendFileResult(
|
|
|
+ List<String> phase1FileNames,
|
|
|
+ List<Long> phase1FileSizes,
|
|
|
+ long totalSize,
|
|
|
+ List<String> phase1ExistingFileNames,
|
|
|
+ List<Long> phase1ExistingFileSizes,
|
|
|
+ long existingTotalSize,
|
|
|
+ TimeValue took
|
|
|
+ ) {
|
|
|
|
|
|
static final SendFileResult EMPTY = new SendFileResult(
|
|
|
Collections.emptyList(),
|
|
@@ -672,7 +653,7 @@ public class RecoverySourceHandler {
|
|
|
shardRecoveryPlan.getSnapshotFilesToRecover().size(),
|
|
|
ByteSizeValue.ofBytes(
|
|
|
shardRecoveryPlan.getSnapshotFilesToRecover()
|
|
|
- .getSnapshotFiles()
|
|
|
+ .snapshotFiles()
|
|
|
.stream()
|
|
|
.mapToLong(BlobStoreIndexShardSnapshot.FileInfo::length)
|
|
|
.sum()
|
|
@@ -702,32 +683,30 @@ public class RecoverySourceHandler {
|
|
|
sendFileInfoStep
|
|
|
);
|
|
|
|
|
|
- sendFileInfoStep.whenComplete(unused -> {
|
|
|
- recoverSnapshotFiles(shardRecoveryPlan, new ActionListener<>() {
|
|
|
- @Override
|
|
|
- public void onResponse(List<StoreFileMetadata> filesFailedToRecoverFromSnapshot) {
|
|
|
- recoverSnapshotFilesStep.onResponse(Tuple.tuple(shardRecoveryPlan, filesFailedToRecoverFromSnapshot));
|
|
|
- }
|
|
|
+ sendFileInfoStep.whenComplete(unused -> recoverSnapshotFiles(shardRecoveryPlan, new ActionListener<>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(List<StoreFileMetadata> filesFailedToRecoverFromSnapshot) {
|
|
|
+ recoverSnapshotFilesStep.onResponse(Tuple.tuple(shardRecoveryPlan, filesFailedToRecoverFromSnapshot));
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- if (shardRecoveryPlan.canRecoverSnapshotFilesFromSourceNode() == false
|
|
|
- && e instanceof CancellableThreads.ExecutionCancelledException == false) {
|
|
|
- ShardRecoveryPlan fallbackPlan = shardRecoveryPlan.getFallbackPlan();
|
|
|
- recoveryTarget.receiveFileInfo(
|
|
|
- fallbackPlan.getFilesToRecoverNames(),
|
|
|
- fallbackPlan.getFilesToRecoverSizes(),
|
|
|
- fallbackPlan.getFilesPresentInTargetNames(),
|
|
|
- fallbackPlan.getFilesPresentInTargetSizes(),
|
|
|
- fallbackPlan.getTranslogOps(),
|
|
|
- recoverSnapshotFilesStep.map(r -> Tuple.tuple(fallbackPlan, Collections.emptyList()))
|
|
|
- );
|
|
|
- } else {
|
|
|
- recoverSnapshotFilesStep.onFailure(e);
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ if (shardRecoveryPlan.canRecoverSnapshotFilesFromSourceNode() == false
|
|
|
+ && e instanceof CancellableThreads.ExecutionCancelledException == false) {
|
|
|
+ ShardRecoveryPlan fallbackPlan = shardRecoveryPlan.getFallbackPlan();
|
|
|
+ recoveryTarget.receiveFileInfo(
|
|
|
+ fallbackPlan.getFilesToRecoverNames(),
|
|
|
+ fallbackPlan.getFilesToRecoverSizes(),
|
|
|
+ fallbackPlan.getFilesPresentInTargetNames(),
|
|
|
+ fallbackPlan.getFilesPresentInTargetSizes(),
|
|
|
+ fallbackPlan.getTranslogOps(),
|
|
|
+ recoverSnapshotFilesStep.map(r -> Tuple.tuple(fallbackPlan, Collections.emptyList()))
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ recoverSnapshotFilesStep.onFailure(e);
|
|
|
}
|
|
|
- });
|
|
|
- }, listener::onFailure);
|
|
|
+ }
|
|
|
+ }), listener::onFailure);
|
|
|
|
|
|
recoverSnapshotFilesStep.whenComplete(planAndFilesFailedToRecoverFromSnapshot -> {
|
|
|
ShardRecoveryPlan recoveryPlan = planAndFilesFailedToRecoverFromSnapshot.v1();
|
|
@@ -747,12 +726,13 @@ public class RecoverySourceHandler {
|
|
|
);
|
|
|
}, listener::onFailure);
|
|
|
|
|
|
- sendFilesStep.whenComplete(recoveryPlan -> {
|
|
|
- createRetentionLease(
|
|
|
+ sendFilesStep.whenComplete(
|
|
|
+ recoveryPlan -> createRetentionLease(
|
|
|
recoveryPlan.getStartingSeqNo(),
|
|
|
createRetentionLeaseStep.map(retentionLease -> Tuple.tuple(recoveryPlan, retentionLease))
|
|
|
- );
|
|
|
- }, listener::onFailure);
|
|
|
+ ),
|
|
|
+ listener::onFailure
|
|
|
+ );
|
|
|
|
|
|
createRetentionLeaseStep.whenComplete(recoveryPlanAndRetentionLease -> {
|
|
|
final ShardRecoveryPlan recoveryPlan = recoveryPlanAndRetentionLease.v1();
|
|
@@ -821,9 +801,7 @@ public class RecoverySourceHandler {
|
|
|
this.snapshotFilesToRecover = shardRecoveryPlan.getSnapshotFilesToRecover();
|
|
|
this.listener = listener;
|
|
|
this.countDown = new CountDown(shardRecoveryPlan.getSnapshotFilesToRecover().size());
|
|
|
- this.pendingSnapshotFilesToRecover = new LinkedBlockingQueue<>(
|
|
|
- shardRecoveryPlan.getSnapshotFilesToRecover().getSnapshotFiles()
|
|
|
- );
|
|
|
+ this.pendingSnapshotFilesToRecover = new LinkedBlockingQueue<>(shardRecoveryPlan.getSnapshotFilesToRecover().snapshotFiles());
|
|
|
}
|
|
|
|
|
|
void start() {
|
|
@@ -876,8 +854,8 @@ public class RecoverySourceHandler {
|
|
|
|
|
|
trackOutstandingRequest(requestFuture);
|
|
|
recoveryTarget.restoreFileFromSnapshot(
|
|
|
- snapshotFilesToRecover.getRepository(),
|
|
|
- snapshotFilesToRecover.getIndexId(),
|
|
|
+ snapshotFilesToRecover.repository(),
|
|
|
+ snapshotFilesToRecover.indexId(),
|
|
|
snapshotFileToRecover,
|
|
|
ActionListener.runBefore(requestFuture, () -> unTrackOutstandingRequest(requestFuture))
|
|
|
);
|
|
@@ -1127,20 +1105,9 @@ public class RecoverySourceHandler {
|
|
|
sender.start();
|
|
|
}
|
|
|
|
|
|
- private static class OperationChunkRequest implements MultiChunkTransfer.ChunkRequest {
|
|
|
- final List<Translog.Operation> operations;
|
|
|
- final boolean lastChunk;
|
|
|
-
|
|
|
- OperationChunkRequest(List<Translog.Operation> operations, boolean lastChunk) {
|
|
|
- this.operations = operations;
|
|
|
- this.lastChunk = lastChunk;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean lastChunk() {
|
|
|
- return lastChunk;
|
|
|
- }
|
|
|
- }
|
|
|
+ private record OperationChunkRequest(List<Translog.Operation> operations, boolean lastChunk)
|
|
|
+ implements
|
|
|
+ MultiChunkTransfer.ChunkRequest {}
|
|
|
|
|
|
private class OperationBatchSender extends MultiChunkTransfer<Translog.Snapshot, OperationChunkRequest> {
|
|
|
private final long startingSeqNo;
|
|
@@ -1292,17 +1259,7 @@ public class RecoverySourceHandler {
|
|
|
listener.onResponse(null);
|
|
|
}
|
|
|
|
|
|
- static final class SendSnapshotResult {
|
|
|
- final long targetLocalCheckpoint;
|
|
|
- final int sentOperations;
|
|
|
- final TimeValue tookTime;
|
|
|
-
|
|
|
- SendSnapshotResult(final long targetLocalCheckpoint, final int sentOperations, final TimeValue tookTime) {
|
|
|
- this.targetLocalCheckpoint = targetLocalCheckpoint;
|
|
|
- this.sentOperations = sentOperations;
|
|
|
- this.tookTime = tookTime;
|
|
|
- }
|
|
|
- }
|
|
|
+ record SendSnapshotResult(long targetLocalCheckpoint, int sentOperations, TimeValue tookTime) {}
|
|
|
|
|
|
/**
|
|
|
* Cancels the recovery and interrupts all eligible threads.
|
|
@@ -1324,25 +1281,10 @@ public class RecoverySourceHandler {
|
|
|
+ '}';
|
|
|
}
|
|
|
|
|
|
- private static class FileChunk implements MultiChunkTransfer.ChunkRequest, Releasable {
|
|
|
- final StoreFileMetadata md;
|
|
|
- final BytesReference content;
|
|
|
- final long position;
|
|
|
- final boolean lastChunk;
|
|
|
- final Releasable onClose;
|
|
|
-
|
|
|
- FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose) {
|
|
|
- this.md = md;
|
|
|
- this.content = content;
|
|
|
- this.position = position;
|
|
|
- this.lastChunk = lastChunk;
|
|
|
- this.onClose = onClose;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean lastChunk() {
|
|
|
- return lastChunk;
|
|
|
- }
|
|
|
+ private record FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose)
|
|
|
+ implements
|
|
|
+ MultiChunkTransfer.ChunkRequest,
|
|
|
+ Releasable {
|
|
|
|
|
|
@Override
|
|
|
public void close() {
|