|
@@ -36,6 +36,7 @@ import org.elasticsearch.indices.IndicesService;
|
|
|
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
+import org.elasticsearch.test.junit.annotations.TestLogging;
|
|
|
import org.elasticsearch.test.transport.MockTransportService;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
@@ -355,6 +356,7 @@ public class RetentionLeaseIT extends ESIntegTestCase {
|
|
|
assertFalse("retention leases background sync must be a noop if soft deletes is disabled", backgroundSyncRequestSent.get());
|
|
|
}
|
|
|
|
|
|
+ @TestLogging(value = "org.elasticsearch.indices.recovery:trace")
|
|
|
public void testRetentionLeasesSyncOnRecovery() throws Exception {
|
|
|
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
|
|
|
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
|
|
@@ -382,6 +384,7 @@ public class RetentionLeaseIT extends ESIntegTestCase {
|
|
|
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
|
|
|
final int length = randomIntBetween(1, 8);
|
|
|
final Map<String, RetentionLease> currentRetentionLeases = new LinkedHashMap<>();
|
|
|
+ logger.info("adding retention [{}}] leases", length);
|
|
|
for (int i = 0; i < length; i++) {
|
|
|
final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8));
|
|
|
final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
|
|
@@ -392,19 +395,23 @@ public class RetentionLeaseIT extends ESIntegTestCase {
|
|
|
latch.await();
|
|
|
currentRetentionLeases.put(id, primary.renewRetentionLease(id, retainingSequenceNumber, source));
|
|
|
}
|
|
|
-
|
|
|
- // Cause some recoveries to fail to ensure that retention leases are handled properly when retrying a recovery
|
|
|
- assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
|
|
|
- .put(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms")));
|
|
|
+ logger.info("finished adding [{}] retention leases", length);
|
|
|
+
|
|
|
+ // cause some recoveries to fail to ensure that retention leases are handled properly when retrying a recovery
|
|
|
+ assertAcked(client().admin()
|
|
|
+ .cluster()
|
|
|
+ .prepareUpdateSettings()
|
|
|
+ .setPersistentSettings(
|
|
|
+ Settings.builder().put(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), TimeValue.timeValueMillis(100))));
|
|
|
final Semaphore recoveriesToDisrupt = new Semaphore(scaledRandomIntBetween(0, 4));
|
|
|
- final MockTransportService primaryTransportService
|
|
|
- = (MockTransportService) internalCluster().getInstance(TransportService.class, primaryShardNodeName);
|
|
|
+ final MockTransportService primaryTransportService =
|
|
|
+ (MockTransportService) internalCluster().getInstance(TransportService.class, primaryShardNodeName);
|
|
|
primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
|
|
|
if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE) && recoveriesToDisrupt.tryAcquire()) {
|
|
|
if (randomBoolean()) {
|
|
|
// return a ConnectTransportException to the START_RECOVERY action
|
|
|
- final TransportService replicaTransportService
|
|
|
- = internalCluster().getInstance(TransportService.class, connection.getNode().getName());
|
|
|
+ final TransportService replicaTransportService =
|
|
|
+ internalCluster().getInstance(TransportService.class, connection.getNode().getName());
|
|
|
final DiscoveryNode primaryNode = primaryTransportService.getLocalNode();
|
|
|
replicaTransportService.disconnectFromNode(primaryNode);
|
|
|
replicaTransportService.connectToNode(primaryNode);
|
|
@@ -416,6 +423,7 @@ public class RetentionLeaseIT extends ESIntegTestCase {
|
|
|
connection.sendRequest(requestId, action, request, options);
|
|
|
});
|
|
|
|
|
|
+ logger.info("allow [{}] replicas to allocate", numberOfReplicas);
|
|
|
// now allow the replicas to be allocated and wait for recovery to finalize
|
|
|
allowNodes("index", 1 + numberOfReplicas);
|
|
|
ensureGreen("index");
|