Browse Source

Handle lower retaining seqno retention lease error (#46420)

We renew the CCR retention lease at a fixed interval, therefore it's
possible to have more than one in-flight renewal requests at the same
time. If requests arrive out of order, then the assertion is violated.

Closes #46416
Closes #46013
Nhat Nguyen 6 years ago
parent
commit
b52c2d5d82

+ 6 - 1
server/src/main/java/org/elasticsearch/ElasticsearchException.java

@@ -1027,7 +1027,12 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
                 org.elasticsearch.index.shard.ShardNotInPrimaryModeException.class,
                 org.elasticsearch.index.shard.ShardNotInPrimaryModeException::new,
                 155,
-                UNKNOWN_VERSION_ADDED);
+                UNKNOWN_VERSION_ADDED),
+        RETENTION_LEASE_INVALID_RETAINING_SEQUENCE_NUMBER_EXCEPTION(
+                org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException.class,
+                org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException::new,
+                156,
+                Version.V_8_0_0);
 
         final Class<? extends ElasticsearchException> exceptionClass;
         final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

+ 12 - 10
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

@@ -383,22 +383,24 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
      * @param retainingSequenceNumber the retaining sequence number
      * @param source                  the source of the retention lease
      * @return the renewed retention lease
-     * @throws RetentionLeaseNotFoundException if the specified retention lease does not exist
+     * @throws RetentionLeaseNotFoundException              if the specified retention lease does not exist
+     * @throws RetentionLeaseInvalidRetainingSeqNoException if the new retaining sequence number is lower than
+     *                                                      the retaining sequence number of the current retention lease.
      */
     public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
         assert primaryMode;
-        if (retentionLeases.contains(id) == false) {
+        final RetentionLease existingRetentionLease = retentionLeases.get(id);
+        if (existingRetentionLease == null) {
             throw new RetentionLeaseNotFoundException(id);
         }
+        if (retainingSequenceNumber < existingRetentionLease.retainingSequenceNumber()) {
+            assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(source) == false :
+                "renewing peer recovery retention lease [" + existingRetentionLease + "]" +
+                    " with a lower retaining sequence number [" + retainingSequenceNumber + "]";
+            throw new RetentionLeaseInvalidRetainingSeqNoException(id, source, retainingSequenceNumber, existingRetentionLease);
+        }
         final RetentionLease retentionLease =
-                new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
-        final RetentionLease existingRetentionLease = retentionLeases.get(id);
-        assert existingRetentionLease != null;
-        assert existingRetentionLease.retainingSequenceNumber() <= retentionLease.retainingSequenceNumber() :
-                "retention lease renewal for [" + id + "]"
-                        + " from [" + source + "]"
-                        + " renewed a lower retaining sequence number [" + retentionLease.retainingSequenceNumber() + "]"
-                        + " than the current lease retaining sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]";
+            new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
         retentionLeases = new RetentionLeases(
                 operationPrimaryTerm,
                 retentionLeases.version() + 1,

+ 39 - 0
server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseInvalidRetainingSeqNoException.java

@@ -0,0 +1,39 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.seqno;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.io.stream.StreamInput;
+
+import java.io.IOException;
+
+public class RetentionLeaseInvalidRetainingSeqNoException extends ElasticsearchException {
+
+    RetentionLeaseInvalidRetainingSeqNoException(String retentionLeaseId, String source, long retainingSequenceNumber,
+                                                 RetentionLease existingRetentionLease) {
+        super("the current retention lease with [" + retentionLeaseId + "]" +
+            " is retaining a higher sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]" +
+            " than the new retaining sequence number [" + retainingSequenceNumber + "] from [" + source + "]");
+    }
+
+    public RetentionLeaseInvalidRetainingSeqNoException(StreamInput in) throws IOException {
+        super(in);
+    }
+}

+ 2 - 0
server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java

@@ -62,6 +62,7 @@ import org.elasticsearch.index.Index;
 import org.elasticsearch.index.engine.RecoveryEngineException;
 import org.elasticsearch.index.query.QueryShardException;
 import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
+import org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException;
 import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
 import org.elasticsearch.index.shard.IllegalIndexShardStateException;
 import org.elasticsearch.index.shard.IndexShardState;
@@ -818,6 +819,7 @@ public class ExceptionSerializationTests extends ESTestCase {
         ids.put(153, RetentionLeaseAlreadyExistsException.class);
         ids.put(154, RetentionLeaseNotFoundException.class);
         ids.put(155, ShardNotInPrimaryModeException.class);
+        ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class);
 
         Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
         for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

+ 30 - 0
server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

@@ -719,6 +719,36 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
         assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
     }
 
+    public void testRenewLeaseWithLowerRetainingSequenceNumber() throws Exception {
+        final AllocationId allocationId = AllocationId.newInitializing();
+        long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
+        final ReplicationTracker replicationTracker = new ReplicationTracker(
+            new ShardId("test", "_na", 0),
+            allocationId.getId(),
+            IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
+            primaryTerm,
+            UNASSIGNED_SEQ_NO,
+            value -> {},
+            () -> 0L,
+            (leases, listener) -> {},
+            OPS_BASED_RECOVERY_ALWAYS_REASONABLE);
+        replicationTracker.updateFromMaster(
+            randomNonNegativeLong(),
+            Collections.singleton(allocationId.getId()),
+            routingTable(Collections.emptySet(), allocationId));
+        replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
+        final String id = randomAlphaOfLength(8);
+        final long retainingSequenceNumber = randomNonNegativeLong();
+        final String source = randomAlphaOfLength(8);
+        replicationTracker.addRetentionLease(id, retainingSequenceNumber, source, ActionListener.wrap(() -> {}));
+        final long lowerRetainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, retainingSequenceNumber - 1);
+        final RetentionLeaseInvalidRetainingSeqNoException e = expectThrows(RetentionLeaseInvalidRetainingSeqNoException.class,
+            () -> replicationTracker.renewRetentionLease(id, lowerRetainingSequenceNumber, source));
+        assertThat(e, hasToString(containsString("the current retention lease with [" + id + "]" +
+            " is retaining a higher sequence number [" + retainingSequenceNumber + "]" +
+            " than the new retaining sequence number [" + lowerRetainingSequenceNumber + "] from [" + source + "]")));
+    }
+
     private void assertRetentionLeases(
             final ReplicationTracker replicationTracker,
             final int size,

+ 7 - 4
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

@@ -44,6 +44,7 @@ import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.engine.CommitStats;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.seqno.RetentionLeaseActions;
+import org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException;
 import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.shard.ShardId;
@@ -469,11 +470,13 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
 
             private void logRetentionLeaseFailure(final String retentionLeaseId, final Throwable cause) {
                 assert cause instanceof ElasticsearchSecurityException == false : cause;
-                logger.warn(new ParameterizedMessage(
-                                "{} background management of retention lease [{}] failed while following",
-                                params.getFollowShardId(),
-                                retentionLeaseId),
+                if (cause instanceof RetentionLeaseInvalidRetainingSeqNoException == false) {
+                    logger.warn(new ParameterizedMessage(
+                            "{} background management of retention lease [{}] failed while following",
+                            params.getFollowShardId(),
+                            retentionLeaseId),
                         cause);
+                }
             }
 
         };

+ 8 - 4
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -45,6 +45,7 @@ import org.elasticsearch.index.engine.EngineException;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
 import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
+import org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException;
 import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
 import org.elasticsearch.index.shard.IndexShardRecoveryException;
 import org.elasticsearch.index.shard.ShardId;
@@ -335,10 +336,13 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                                 ActionListener.wrap(
                                         r -> {},
                                         e -> {
-                                            assert e instanceof ElasticsearchSecurityException == false : e;
-                                            logger.warn(new ParameterizedMessage(
-                                                            "{} background renewal of retention lease [{}] failed during restore", shardId,
-                                                    retentionLeaseId), e);
+                                            final Throwable cause = ExceptionsHelper.unwrapCause(e);
+                                            assert cause instanceof ElasticsearchSecurityException == false : cause;
+                                            if (cause instanceof RetentionLeaseInvalidRetainingSeqNoException == false) {
+                                                logger.warn(new ParameterizedMessage(
+                                                    "{} background renewal of retention lease [{}] failed during restore", shardId,
+                                                    retentionLeaseId), cause);
+                                            }
                                         }));
                     }
                 },

+ 0 - 2
x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java

@@ -23,7 +23,6 @@ import static org.hamcrest.Matchers.equalTo;
 
 public class CcrRollingUpgradeIT extends AbstractMultiClusterUpgradeTestCase {
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/46416")
     public void testUniDirectionalIndexFollowing() throws Exception {
         logger.info("clusterName={}, upgradeState={}", clusterName, upgradeState);
 
@@ -91,7 +90,6 @@ public class CcrRollingUpgradeIT extends AbstractMultiClusterUpgradeTestCase {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/46416")
     public void testAutoFollowing() throws Exception {
         String leaderIndex1 = "logs-20200101";
         String leaderIndex2 = "logs-20200102";