|
@@ -39,6 +39,7 @@ import org.elasticsearch.common.StopWatch;
|
|
|
import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.lease.Releasable;
|
|
|
+import org.elasticsearch.common.lease.Releasables;
|
|
|
import org.elasticsearch.common.logging.Loggers;
|
|
|
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
@@ -74,7 +75,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Consumer;
|
|
|
-import java.util.function.Supplier;
|
|
|
+import java.util.function.IntSupplier;
|
|
|
import java.util.stream.StreamSupport;
|
|
|
|
|
|
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
|
|
@@ -158,15 +159,21 @@ public class RecoverySourceHandler {
|
|
|
final long startingSeqNo;
|
|
|
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
|
|
|
isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo());
|
|
|
- final SendFileResult sendFileResult;
|
|
|
+
|
|
|
+ final StepListener<SendFileResult> sendFileStep = new StepListener<>();
|
|
|
+ final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
|
|
|
+ final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
|
|
|
+ final StepListener<Void> finalizeStep = new StepListener<>();
|
|
|
+
|
|
|
if (isSequenceNumberBasedRecovery) {
|
|
|
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
|
|
|
startingSeqNo = request.startingSeqNo();
|
|
|
- sendFileResult = SendFileResult.EMPTY;
|
|
|
+ sendFileStep.onResponse(SendFileResult.EMPTY);
|
|
|
} else {
|
|
|
- final Engine.IndexCommitRef phase1Snapshot;
|
|
|
+ final Engine.IndexCommitRef safeCommitRef;
|
|
|
try {
|
|
|
- phase1Snapshot = shard.acquireSafeIndexCommit();
|
|
|
+ safeCommitRef = shard.acquireSafeIndexCommit();
|
|
|
+ resources.add(safeCommitRef);
|
|
|
} catch (final Exception e) {
|
|
|
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
|
|
|
}
|
|
@@ -175,24 +182,29 @@ public class RecoverySourceHandler {
|
|
|
startingSeqNo = 0;
|
|
|
try {
|
|
|
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
|
|
|
- sendFileResult = phase1(phase1Snapshot.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps);
|
|
|
+ shard.store().incRef();
|
|
|
+ final Releasable releaseStore = Releasables.releaseOnce(shard.store()::decRef);
|
|
|
+ resources.add(releaseStore);
|
|
|
+ sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
|
|
|
+ try {
|
|
|
+ IOUtils.close(safeCommitRef, releaseStore);
|
|
|
+ } catch (final IOException ex) {
|
|
|
+ logger.warn("releasing snapshot caused exception", ex);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);
|
|
|
} catch (final Exception e) {
|
|
|
- throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
|
|
|
- } finally {
|
|
|
- try {
|
|
|
- IOUtils.close(phase1Snapshot);
|
|
|
- } catch (final IOException ex) {
|
|
|
- logger.warn("releasing snapshot caused exception", ex);
|
|
|
- }
|
|
|
+ throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);
|
|
|
}
|
|
|
}
|
|
|
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
|
|
|
|
|
|
- final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
|
|
|
- // For a sequence based recovery, the target can keep its local translog
|
|
|
- prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
|
|
|
- shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
|
|
|
- final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
|
|
|
+ sendFileStep.whenComplete(r -> {
|
|
|
+ // For a sequence based recovery, the target can keep its local translog
|
|
|
+ prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
|
|
|
+ shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
|
|
|
+ }, onFailure);
|
|
|
+
|
|
|
prepareEngineStep.whenComplete(prepareEngineTime -> {
|
|
|
/*
|
|
|
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
|
|
@@ -229,12 +241,12 @@ public class RecoverySourceHandler {
|
|
|
|
|
|
}, onFailure);
|
|
|
|
|
|
- final StepListener<Void> finalizeStep = new StepListener<>();
|
|
|
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
|
|
|
|
|
|
finalizeStep.whenComplete(r -> {
|
|
|
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
|
|
|
final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();
|
|
|
+ final SendFileResult sendFileResult = sendFileStep.result();
|
|
|
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
|
|
|
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
|
|
|
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
|
|
@@ -330,18 +342,17 @@ public class RecoverySourceHandler {
|
|
|
* segments that are missing. Only segments that have the same size and
|
|
|
* checksum can be reused
|
|
|
*/
|
|
|
- public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier<Integer> translogOps) {
|
|
|
+ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
|
|
|
cancellableThreads.checkForCancel();
|
|
|
// Total size of segment files that are recovered
|
|
|
- long totalSize = 0;
|
|
|
+ long totalSizeInBytes = 0;
|
|
|
// Total size of segment files that were able to be re-used
|
|
|
- long existingTotalSize = 0;
|
|
|
+ long existingTotalSizeInBytes = 0;
|
|
|
final List<String> phase1FileNames = new ArrayList<>();
|
|
|
final List<Long> phase1FileSizes = new ArrayList<>();
|
|
|
final List<String> phase1ExistingFileNames = new ArrayList<>();
|
|
|
final List<Long> phase1ExistingFileSizes = new ArrayList<>();
|
|
|
final Store store = shard.store();
|
|
|
- store.incRef();
|
|
|
try {
|
|
|
StopWatch stopWatch = new StopWatch().start();
|
|
|
final Store.MetadataSnapshot recoverySourceMetadata;
|
|
@@ -367,12 +378,12 @@ public class RecoverySourceHandler {
|
|
|
for (StoreFileMetaData md : diff.identical) {
|
|
|
phase1ExistingFileNames.add(md.name());
|
|
|
phase1ExistingFileSizes.add(md.length());
|
|
|
- existingTotalSize += md.length();
|
|
|
+ existingTotalSizeInBytes += md.length();
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," +
|
|
|
" size [{}]", md.name(), md.checksum(), md.length());
|
|
|
}
|
|
|
- totalSize += md.length();
|
|
|
+ totalSizeInBytes += md.length();
|
|
|
}
|
|
|
List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
|
|
|
phase1Files.addAll(diff.different);
|
|
@@ -386,75 +397,33 @@ public class RecoverySourceHandler {
|
|
|
}
|
|
|
phase1FileNames.add(md.name());
|
|
|
phase1FileSizes.add(md.length());
|
|
|
- totalSize += md.length();
|
|
|
+ totalSizeInBytes += md.length();
|
|
|
}
|
|
|
|
|
|
logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
|
|
|
- phase1FileNames.size(), new ByteSizeValue(totalSize),
|
|
|
- phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
|
|
|
+ phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes),
|
|
|
+ phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));
|
|
|
cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(
|
|
|
- phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get()));
|
|
|
+ phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.getAsInt()));
|
|
|
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
|
|
|
- // Send the CLEAN_FILES request, which takes all of the files that
|
|
|
- // were transferred and renames them from their temporary file
|
|
|
- // names to the actual file names. It also writes checksums for
|
|
|
- // the files after they have been renamed.
|
|
|
- //
|
|
|
- // Once the files have been renamed, any other files that are not
|
|
|
- // related to this recovery (out of date segments, for example)
|
|
|
- // are deleted
|
|
|
- try {
|
|
|
- cancellableThreads.executeIO(() ->
|
|
|
- recoveryTarget.cleanFiles(translogOps.get(), globalCheckpoint, recoverySourceMetadata));
|
|
|
- } catch (RemoteTransportException | IOException targetException) {
|
|
|
- final IOException corruptIndexException;
|
|
|
- // we realized that after the index was copied and we wanted to finalize the recovery
|
|
|
- // the index was corrupted:
|
|
|
- // - maybe due to a broken segments file on an empty index (transferred with no checksum)
|
|
|
- // - maybe due to old segments without checksums or length only checks
|
|
|
- if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(targetException)) != null) {
|
|
|
- try {
|
|
|
- final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot);
|
|
|
- StoreFileMetaData[] metadata =
|
|
|
- StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(StoreFileMetaData[]::new);
|
|
|
- ArrayUtil.timSort(metadata, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first
|
|
|
- for (StoreFileMetaData md : metadata) {
|
|
|
- cancellableThreads.checkForCancel();
|
|
|
- logger.debug("checking integrity for file {} after remove corruption exception", md);
|
|
|
- if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
|
|
|
- shard.failShard("recovery", corruptIndexException);
|
|
|
- logger.warn("Corrupted file detected {} checksum mismatch", md);
|
|
|
- throw corruptIndexException;
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (IOException ex) {
|
|
|
- targetException.addSuppressed(ex);
|
|
|
- throw targetException;
|
|
|
- }
|
|
|
- // corruption has happened on the way to replica
|
|
|
- RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but " +
|
|
|
- "checksums are ok", null);
|
|
|
- exception.addSuppressed(targetException);
|
|
|
- logger.warn(() -> new ParameterizedMessage(
|
|
|
- "{} Remote file corruption during finalization of recovery on node {}. local checksum OK",
|
|
|
- shard.shardId(), request.targetNode()), corruptIndexException);
|
|
|
- throw exception;
|
|
|
- } else {
|
|
|
- throw targetException;
|
|
|
- }
|
|
|
- }
|
|
|
+ final long totalSize = totalSizeInBytes;
|
|
|
+ final long existingTotalSize = existingTotalSizeInBytes;
|
|
|
+ cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, ActionListener.map(listener, aVoid -> {
|
|
|
+ final TimeValue took = stopWatch.totalTime();
|
|
|
+ logger.trace("recovery [phase1]: took [{}]", took);
|
|
|
+ return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
|
|
|
+ phase1ExistingFileSizes, existingTotalSize, took);
|
|
|
+ }));
|
|
|
} else {
|
|
|
logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target",
|
|
|
recoverySourceMetadata.getSyncId());
|
|
|
+ final TimeValue took = stopWatch.totalTime();
|
|
|
+ logger.trace("recovery [phase1]: took [{}]", took);
|
|
|
+ listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSizeInBytes, phase1ExistingFileNames,
|
|
|
+ phase1ExistingFileSizes, existingTotalSizeInBytes, took));
|
|
|
}
|
|
|
- final TimeValue took = stopWatch.totalTime();
|
|
|
- logger.trace("recovery [phase1]: took [{}]", took);
|
|
|
- return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
|
|
|
- phase1ExistingFileSizes, existingTotalSize, took);
|
|
|
} catch (Exception e) {
|
|
|
- throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSize), e);
|
|
|
- } finally {
|
|
|
- store.decRef();
|
|
|
+ throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes), e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -692,7 +661,7 @@ public class RecoverySourceHandler {
|
|
|
'}';
|
|
|
}
|
|
|
|
|
|
- void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps) throws Exception {
|
|
|
+ void sendFiles(Store store, StoreFileMetaData[] files, IntSupplier translogOps) throws Exception {
|
|
|
ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
|
|
|
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
|
|
|
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
|
|
@@ -717,7 +686,7 @@ public class RecoverySourceHandler {
|
|
|
}
|
|
|
final long requestFilePosition = position;
|
|
|
cancellableThreads.executeIO(() ->
|
|
|
- recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.get(),
|
|
|
+ recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.getAsInt(),
|
|
|
ActionListener.wrap(
|
|
|
r -> requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId),
|
|
|
e -> {
|
|
@@ -738,24 +707,53 @@ public class RecoverySourceHandler {
|
|
|
cancellableThreads.execute(() -> requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqIdTracker.getMaxSeqNo()));
|
|
|
}
|
|
|
if (error.get() != null) {
|
|
|
- handleErrorOnSendFiles(store, error.get().v1(), error.get().v2());
|
|
|
+ handleErrorOnSendFiles(store, error.get().v2(), new StoreFileMetaData[]{error.get().v1()});
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception {
|
|
|
- final IOException corruptIndexException;
|
|
|
- if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
|
|
|
- if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
|
|
|
- logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
|
|
|
- failEngine(corruptIndexException);
|
|
|
- throw corruptIndexException;
|
|
|
+ private void cleanFiles(Store store, Store.MetadataSnapshot sourceMetadata, IntSupplier translogOps,
|
|
|
+ long globalCheckpoint, ActionListener<Void> listener) {
|
|
|
+ // Send the CLEAN_FILES request, which takes all of the files that
|
|
|
+ // were transferred and renames them from their temporary file
|
|
|
+ // names to the actual file names. It also writes checksums for
|
|
|
+ // the files after they have been renamed.
|
|
|
+ //
|
|
|
+ // Once the files have been renamed, any other files that are not
|
|
|
+ // related to this recovery (out of date segments, for example)
|
|
|
+ // are deleted
|
|
|
+ cancellableThreads.execute(() -> recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata,
|
|
|
+ ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> {
|
|
|
+ StoreFileMetaData[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetaData[]::new);
|
|
|
+ ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first
|
|
|
+ handleErrorOnSendFiles(store, e, mds);
|
|
|
+ throw e;
|
|
|
+ }))));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetaData[] mds) throws Exception {
|
|
|
+ final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e);
|
|
|
+ if (corruptIndexException != null) {
|
|
|
+ Exception localException = null;
|
|
|
+ for (StoreFileMetaData md : mds) {
|
|
|
+ cancellableThreads.checkForCancel();
|
|
|
+ logger.debug("checking integrity for file {} after remove corruption exception", md);
|
|
|
+ if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
|
|
|
+ logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
|
|
|
+ if (localException == null) {
|
|
|
+ localException = corruptIndexException;
|
|
|
+ }
|
|
|
+ failEngine(corruptIndexException);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (localException != null) {
|
|
|
+ throw localException;
|
|
|
} else { // corruption has happened on the way to replica
|
|
|
- RemoteTransportException exception = new RemoteTransportException(
|
|
|
+ RemoteTransportException remoteException = new RemoteTransportException(
|
|
|
"File corruption occurred on recovery but checksums are ok", null);
|
|
|
- exception.addSuppressed(e);
|
|
|
+ remoteException.addSuppressed(e);
|
|
|
logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK",
|
|
|
- shardId, request.targetNode(), md), corruptIndexException);
|
|
|
- throw exception;
|
|
|
+ shardId, request.targetNode(), mds), corruptIndexException);
|
|
|
+ throw remoteException;
|
|
|
}
|
|
|
} else {
|
|
|
throw e;
|