|
@@ -36,7 +36,9 @@ import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.io.Streams;
|
|
|
import org.elasticsearch.common.logging.ESLogger;
|
|
|
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
|
|
+import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
+import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.util.CancellableThreads;
|
|
|
import org.elasticsearch.common.util.CancellableThreads.Interruptable;
|
|
|
import org.elasticsearch.index.engine.RecoveryEngineException;
|
|
@@ -49,6 +51,7 @@ import org.elasticsearch.transport.RemoteTransportException;
|
|
|
import org.elasticsearch.transport.TransportRequestOptions;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
|
|
|
+import java.io.BufferedOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStream;
|
|
|
import java.util.ArrayList;
|
|
@@ -57,6 +60,7 @@ import java.util.List;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.stream.StreamSupport;
|
|
@@ -77,9 +81,9 @@ public class RecoverySourceHandler {
|
|
|
private final StartRecoveryRequest request;
|
|
|
private final RecoverySettings recoverySettings;
|
|
|
private final TransportService transportService;
|
|
|
+ private final int chunkSizeInBytes;
|
|
|
|
|
|
protected final RecoveryResponse response;
|
|
|
- private final TransportRequestOptions requestOptions;
|
|
|
|
|
|
private final CancellableThreads cancellableThreads = new CancellableThreads() {
|
|
|
@Override
|
|
@@ -106,14 +110,8 @@ public class RecoverySourceHandler {
|
|
|
this.transportService = transportService;
|
|
|
this.indexName = this.request.shardId().index().name();
|
|
|
this.shardId = this.request.shardId().id();
|
|
|
-
|
|
|
+ this.chunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
|
|
|
this.response = new RecoveryResponse();
|
|
|
- this.requestOptions = TransportRequestOptions.builder()
|
|
|
- .withCompress(recoverySettings.compress())
|
|
|
- .withType(TransportRequestOptions.Type.RECOVERY)
|
|
|
- .withTimeout(recoverySettings.internalActionTimeout())
|
|
|
- .build();
|
|
|
-
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -218,7 +216,7 @@ public class RecoverySourceHandler {
|
|
|
totalSize += md.length();
|
|
|
}
|
|
|
List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
|
|
|
- phase1Files.addAll(diff.different);
|
|
|
+ phase1Files.addAll(diff.different);
|
|
|
phase1Files.addAll(diff.missing);
|
|
|
for (StoreFileMetaData md : phase1Files) {
|
|
|
if (request.metadataSnapshot().asMap().containsKey(md.name())) {
|
|
@@ -249,7 +247,7 @@ public class RecoverySourceHandler {
|
|
|
});
|
|
|
// How many bytes we've copied since we last called RateLimiter.pause
|
|
|
final AtomicLong bytesSinceLastPause = new AtomicLong();
|
|
|
- final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new RecoveryOutputStream(md, bytesSinceLastPause, translogView);
|
|
|
+ final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), chunkSizeInBytes);
|
|
|
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
|
|
|
cancellableThreads.execute(() -> {
|
|
|
// Send the CLEAN_FILES request, which takes all of the files that
|
|
@@ -432,7 +430,7 @@ public class RecoverySourceHandler {
|
|
|
}
|
|
|
|
|
|
final TransportRequestOptions recoveryOptions = TransportRequestOptions.builder()
|
|
|
- .withCompress(recoverySettings.compress())
|
|
|
+ .withCompress(true)
|
|
|
.withType(TransportRequestOptions.Type.RECOVERY)
|
|
|
.withTimeout(recoverySettings.internalActionLongTimeout())
|
|
|
.build();
|
|
@@ -451,9 +449,9 @@ public class RecoverySourceHandler {
|
|
|
size += operation.estimateSize();
|
|
|
totalOperations++;
|
|
|
|
|
|
- // Check if this request is past the size or bytes threshold, and
|
|
|
+ // Check if this request is past bytes threshold, and
|
|
|
// if so, send it off
|
|
|
- if (ops >= recoverySettings.translogOps() || size >= recoverySettings.translogSize().bytes()) {
|
|
|
+ if (size >= chunkSizeInBytes) {
|
|
|
|
|
|
// don't throttle translog, since we lock for phase3 indexing,
|
|
|
// so we need to move it as fast as possible. Note, since we
|
|
@@ -537,7 +535,7 @@ public class RecoverySourceHandler {
|
|
|
|
|
|
@Override
|
|
|
public final void write(int b) throws IOException {
|
|
|
- write(new byte[]{(byte) b}, 0, 1);
|
|
|
+ throw new UnsupportedOperationException("we can't send single bytes over the wire");
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -548,6 +546,11 @@ public class RecoverySourceHandler {
|
|
|
}
|
|
|
|
|
|
private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
|
|
|
+ final TransportRequestOptions chunkSendOptions = TransportRequestOptions.builder()
|
|
|
+ .withCompress(false) // lucene files are already compressed and therefore compressing this won't really help much so we are safing the cpu for other things
|
|
|
+ .withType(TransportRequestOptions.Type.RECOVERY)
|
|
|
+ .withTimeout(recoverySettings.internalActionTimeout())
|
|
|
+ .build();
|
|
|
cancellableThreads.execute(() -> {
|
|
|
// Pause using the rate limiter, if desired, to throttle the recovery
|
|
|
final long throttleTimeInNanos;
|
|
@@ -577,7 +580,7 @@ public class RecoverySourceHandler {
|
|
|
* see how many translog ops we accumulate while copying files across the network. A future optimization
|
|
|
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
|
|
|
*/
|
|
|
- throttleTimeInNanos), requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
|
|
+ throttleTimeInNanos), chunkSendOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
|
|
});
|
|
|
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
|
|
|
throw new IndexShardClosedException(request.shardId());
|
|
@@ -670,9 +673,10 @@ public class RecoverySourceHandler {
|
|
|
pool = recoverySettings.concurrentSmallFileStreamPool();
|
|
|
}
|
|
|
Future<Void> future = pool.submit(() -> {
|
|
|
- try (final OutputStream outputStream = outputStreamFactory.apply(md);
|
|
|
- final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
|
|
|
- Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStream);
|
|
|
+ try (final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
|
|
|
+ // it's fine that we are only having the indexInput int he try/with block. The copy methods handles
|
|
|
+ // exceptions during close correctly and doesn't hide the original exception.
|
|
|
+ Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md));
|
|
|
}
|
|
|
return null;
|
|
|
});
|