Browse Source

Enforce retention leases require soft deletes (#39922)

If a primary on 6.7 and a replica on 5.6 are running more than 5 minutes
(retention leases background sync interval), the retention leases
background sync will be triggered, and it will trip 6.7 node due to the
illegal checkpoint value. We can fix the problem by making the returned
checkpoint depends on the node version. This PR, however, chooses to
enforce retention leases require soft deletes, and make retention leases
sync noop if soft deletes is disabled instead.

Closes #39914
Nhat Nguyen 6 years ago
parent
commit
695b20f01b

+ 3 - 1
server/src/main/java/org/elasticsearch/index/IndexService.java

@@ -790,7 +790,9 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
     }
 
     private void syncRetentionLeases() {
-        sync(IndexShard::syncRetentionLeases, "retention lease");
+        if (indexSettings.isSoftDeleteEnabled()) {
+            sync(IndexShard::syncRetentionLeases, "retention lease");
+        }
     }
 
     private void sync(final Consumer<IndexShard> sync, final String source) {

+ 12 - 0
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1897,6 +1897,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout);
     }
 
+    private void ensureSoftDeletesEnabled(String feature) {
+        if (indexSettings.isSoftDeleteEnabled() == false) {
+            String message = feature + " requires soft deletes but " + indexSettings.getIndex() + " does not have soft deletes enabled";
+            assert false : message;
+            throw new IllegalStateException(message);
+        }
+    }
+
     /**
      * Get all retention leases tracked on this shard.
      *
@@ -1943,6 +1951,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         Objects.requireNonNull(listener);
         assert assertPrimaryMode();
         verifyNotClosed();
+        ensureSoftDeletesEnabled("retention leases");
         try (Closeable ignore = acquireRetentionLock()) {
             final long actualRetainingSequenceNumber =
                     retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
@@ -1964,6 +1973,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     public RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
         assert assertPrimaryMode();
         verifyNotClosed();
+        ensureSoftDeletesEnabled("retention leases");
         try (Closeable ignore = acquireRetentionLock()) {
             final long actualRetainingSequenceNumber =
                     retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
@@ -1983,6 +1993,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         Objects.requireNonNull(listener);
         assert assertPrimaryMode();
         verifyNotClosed();
+        ensureSoftDeletesEnabled("retention leases");
         replicationTracker.removeRetentionLease(id, listener);
     }
 
@@ -2024,6 +2035,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     public void syncRetentionLeases() {
         assert assertPrimaryMode();
         verifyNotClosed();
+        ensureSoftDeletesEnabled("retention leases");
         final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
         if (retentionLeases.v1()) {
             logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2());

+ 33 - 1
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java

@@ -33,7 +33,9 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
 
 import java.io.Closeable;
 import java.util.ArrayList;
@@ -73,7 +75,7 @@ public class RetentionLeaseIT extends ESIntegTestCase  {
     protected Collection<Class<? extends Plugin>> nodePlugins() {
         return Stream.concat(
                 super.nodePlugins().stream(),
-                Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class))
+                Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class, MockTransportService.TestPlugin.class))
                 .collect(Collectors.toList());
     }
 
@@ -317,6 +319,36 @@ public class RetentionLeaseIT extends ESIntegTestCase  {
         }
     }
 
+    public void testRetentionLeasesBackgroundSyncWithSoftDeletesDisabled() throws Exception {
+        final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
+        internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
+        TimeValue syncIntervalSetting = TimeValue.timeValueMillis(between(1, 100));
+        final Settings settings = Settings.builder()
+            .put("index.number_of_shards", 1)
+            .put("index.number_of_replicas", numberOfReplicas)
+            .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), syncIntervalSetting.getStringRep())
+            .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)
+            .build();
+        createIndex("index", settings);
+        final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
+        final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
+        final MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance(
+            TransportService.class, primaryShardNodeName);
+        final AtomicBoolean backgroundSyncRequestSent = new AtomicBoolean();
+        primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
+            if (action.startsWith(RetentionLeaseBackgroundSyncAction.ACTION_NAME)) {
+                backgroundSyncRequestSent.set(true);
+            }
+            connection.sendRequest(requestId, action, request, options);
+        });
+        final long start = System.nanoTime();
+        ensureGreen("index");
+        final long syncEnd = System.nanoTime();
+        // We sleep long enough for the retention leases background sync to be triggered
+        Thread.sleep(Math.max(0, randomIntBetween(2, 3) * syncIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(syncEnd - start)));
+        assertFalse("retention leases background sync must be a noop if soft deletes is disabled", backgroundSyncRequestSent.get());
+    }
+
     public void testRetentionLeasesSyncOnRecovery() throws Exception {
         final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
         internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);

+ 2 - 0
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java

@@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndicesService;
@@ -41,6 +42,7 @@ public class RetentionLeaseStatsTests extends ESSingleNodeTestCase {
         final Settings settings = Settings.builder()
                 .put("index.number_of_shards", 1)
                 .put("index.number_of_replicas", 0)
+                .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
                 .build();
         createIndex("index", settings);
         ensureGreen("index");

+ 23 - 3
server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java

@@ -71,7 +71,8 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
     }
 
     public void testAddOrRenewRetentionLease() throws IOException {
-        final IndexShard indexShard = newStartedShard(true);
+        final IndexShard indexShard = newStartedShard(true,
+            Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
         final long primaryTerm = indexShard.getOperationPrimaryTerm();
         try {
             final int length = randomIntBetween(0, 8);
@@ -102,7 +103,8 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
     }
 
     public void testRemoveRetentionLease() throws IOException {
-        final IndexShard indexShard = newStartedShard(true);
+        final IndexShard indexShard = newStartedShard(true,
+            Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
         final long primaryTerm = indexShard.getOperationPrimaryTerm();
         try {
             final int length = randomIntBetween(0, 8);
@@ -143,6 +145,7 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
         final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
         final Settings settings = Settings
                 .builder()
+                .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
                 .put(
                         IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(),
                         TimeValue.timeValueMillis(retentionLeaseMillis))
@@ -268,7 +271,8 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
     }
 
     public void testRetentionLeaseStats() throws IOException {
-        final IndexShard indexShard = newStartedShard(true);
+        final IndexShard indexShard = newStartedShard(true,
+            Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
         try {
             final int length = randomIntBetween(0, 8);
             final long[] minimumRetainingSequenceNumbers = new long[length];
@@ -289,6 +293,22 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
         }
     }
 
+    public void testRetentionLeasesActionsFailWithSoftDeletesDisabled() throws Exception {
+        IndexShard shard = newStartedShard(true, Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build());
+        assertThat(expectThrows(AssertionError.class, () -> shard.addRetentionLease(randomAlphaOfLength(10),
+            randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE), "test", ActionListener.wrap(() -> {}))).getMessage(),
+            equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled"));
+        assertThat(expectThrows(AssertionError.class, () -> shard.renewRetentionLease(
+            randomAlphaOfLength(10), randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE), "test")).getMessage(),
+            equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled"));
+        assertThat(expectThrows(AssertionError.class, () -> shard.removeRetentionLease(
+            randomAlphaOfLength(10), ActionListener.wrap(() -> {}))).getMessage(),
+            equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled"));
+        assertThat(expectThrows(AssertionError.class, shard::syncRetentionLeases).getMessage(),
+            equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled"));
+        closeShards(shard);
+    }
+
     private void assertRetentionLeases(
             final IndexShard indexShard,
             final int size,