|
@@ -19,14 +19,19 @@ import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.index.query.QueryBuilders;
|
|
|
+import org.elasticsearch.index.shard.ShardId;
|
|
|
+import org.elasticsearch.indices.IndicesService;
|
|
|
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
|
|
|
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
|
|
|
+import org.elasticsearch.indices.recovery.RecoveryFilesInfoRequest;
|
|
|
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
import org.elasticsearch.test.transport.MockTransportService;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
|
|
|
+import java.nio.file.Files;
|
|
|
+import java.nio.file.Path;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
@@ -34,6 +39,7 @@ import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.function.Function;
|
|
|
|
|
|
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
@@ -72,16 +78,14 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
|
|
|
// we use 2 nodes a lucky and unlucky one
|
|
|
// the lucky one holds the primary
|
|
|
// the unlucky one gets the replica and the truncated leftovers
|
|
|
- NodeStats primariesNode = dataNodeStats.get(0);
|
|
|
- NodeStats unluckyNode = dataNodeStats.get(1);
|
|
|
+ String primariesNode = dataNodeStats.get(0).getNode().getName();
|
|
|
+ String unluckyNode = dataNodeStats.get(1).getNode().getName();
|
|
|
|
|
|
// create the index and prevent allocation on any other nodes than the lucky one
|
|
|
// we have no replicas so far and make sure that we allocate the primary on the lucky node
|
|
|
assertAcked(
|
|
|
prepareCreate("test").setMapping("field1", "type=text", "the_id", "type=text")
|
|
|
- .setSettings(
|
|
|
- indexSettings(numberOfShards(), 0).put("index.routing.allocation.include._name", primariesNode.getNode().getName())
|
|
|
- )
|
|
|
+ .setSettings(indexSettings(numberOfShards(), 0).put("index.routing.allocation.include._name", primariesNode))
|
|
|
); // only allocate on the lucky node
|
|
|
|
|
|
// index some docs and check if they are coming back
|
|
@@ -102,20 +106,54 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
|
|
|
indicesAdmin().prepareFlush().setForce(true).get(); // double flush to create safe commit in case of async durability
|
|
|
indicesAdmin().prepareForceMerge().setMaxNumSegments(1).setFlush(true).get();
|
|
|
|
|
|
+ // We write some garbage into the shard directory so that we can verify that it is cleaned up before we resend.
|
|
|
+ // Cleanup helps prevent recovery from failing due to lack of space from garbage left over from a previous
|
|
|
+ // recovery that crashed during file transmission. #104473
|
|
|
+ // We can't look for the presence of the recovery temp files themselves because they are automatically
|
|
|
+ // cleaned up on clean shutdown by MultiFileWriter.
|
|
|
+ final String GARBAGE_PREFIX = "recovery.garbage.";
|
|
|
+
|
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
|
final AtomicBoolean truncate = new AtomicBoolean(true);
|
|
|
+
|
|
|
+ IndicesService unluckyIndices = internalCluster().getInstance(IndicesService.class, unluckyNode);
|
|
|
+ Function<ShardId, Path> getUnluckyIndexPath = (shardId) -> unluckyIndices.indexService(shardId.getIndex())
|
|
|
+ .getShard(shardId.getId())
|
|
|
+ .shardPath()
|
|
|
+ .resolveIndex();
|
|
|
+
|
|
|
for (NodeStats dataNode : dataNodeStats) {
|
|
|
MockTransportService.getInstance(dataNode.getNode().getName())
|
|
|
.addSendBehavior(
|
|
|
- internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
|
|
|
+ internalCluster().getInstance(TransportService.class, unluckyNode),
|
|
|
(connection, requestId, action, request, options) -> {
|
|
|
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
|
|
|
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
|
|
|
logger.info("file chunk [{}] lastChunk: {}", req, req.lastChunk());
|
|
|
+ // During the first recovery attempt (when truncate is set), write an extra garbage file once for each
|
|
|
+ // file transmitted. We get multiple chunks per file but only one is the last.
|
|
|
+ if (truncate.get() && req.lastChunk()) {
|
|
|
+ final var shardPath = getUnluckyIndexPath.apply(req.shardId());
|
|
|
+ final var garbagePath = Files.createTempFile(shardPath, GARBAGE_PREFIX, null);
|
|
|
+ logger.info("writing garbage at: {}", garbagePath);
|
|
|
+ }
|
|
|
if ((req.name().endsWith("cfs") || req.name().endsWith("fdt")) && req.lastChunk() && truncate.get()) {
|
|
|
latch.countDown();
|
|
|
throw new RuntimeException("Caused some truncated files for fun and profit");
|
|
|
}
|
|
|
+ } else if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) {
|
|
|
+ // verify there are no garbage files present at the FILES_INFO stage of recovery. This precedes FILES_CHUNKS
|
|
|
+ // and so will run before garbage has been introduced on the first attempt, and before post-transfer cleanup
|
|
|
+ // has been performed on the second.
|
|
|
+ final var shardPath = getUnluckyIndexPath.apply(((RecoveryFilesInfoRequest) request).shardId());
|
|
|
+ try (var list = Files.list(shardPath).filter(path -> path.getFileName().startsWith(GARBAGE_PREFIX))) {
|
|
|
+ final var garbageFiles = list.toArray();
|
|
|
+ assertArrayEquals(
|
|
|
+ "garbage files should have been cleaned before file transmission",
|
|
|
+ new Path[0],
|
|
|
+ garbageFiles
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
connection.sendRequest(requestId, action, request, options);
|
|
|
}
|
|
@@ -128,14 +166,14 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
|
|
|
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
|
|
|
.put(
|
|
|
"index.routing.allocation.include._name", // now allow allocation on all nodes
|
|
|
- primariesNode.getNode().getName() + "," + unluckyNode.getNode().getName()
|
|
|
+ primariesNode + "," + unluckyNode
|
|
|
),
|
|
|
"test"
|
|
|
);
|
|
|
|
|
|
latch.await();
|
|
|
|
|
|
- // at this point we got some truncated left overs on the replica on the unlucky node
|
|
|
+ // at this point we got some truncated leftovers on the replica on the unlucky node
|
|
|
// now we are allowing the recovery to allocate again and finish to see if we wipe the truncated files
|
|
|
truncate.compareAndSet(true, false);
|
|
|
ensureGreen("test");
|