Bladeren bron

Free shard resources when recovery reset is cancelled

Resetting a recovery consists of resetting the old recovery target and replacing it by a new recovery target object. This is done on the Cancellable threads of
the new recovery target. If the new recovery target is already cancelled before or while this happens, for example due to shard closing or recovery source
changing, we have to make sure that the old recovery target object frees all shard resources.

Relates to #22325
Yannick Welsch 8 jaren geleden
bovenliggende
commit
816e1c6cc4

+ 12 - 2
core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java

@@ -26,6 +26,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.Callback;
+import org.elasticsearch.common.util.CancellableThreads;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.index.shard.IndexShard;
@@ -106,9 +107,18 @@ public class RecoveriesCollection {
             }
 
             // Closes the current recovery target
-            final RecoveryTarget finalOldRecoveryTarget = oldRecoveryTarget;
             final AtomicBoolean successfulReset = new AtomicBoolean();
-            newRecoveryTarget.CancellableThreads().executeIO(() -> successfulReset.set(finalOldRecoveryTarget.resetRecovery()));
+            try {
+                final RecoveryTarget finalOldRecoveryTarget = oldRecoveryTarget;
+                newRecoveryTarget.CancellableThreads().executeIO(() -> successfulReset.set(finalOldRecoveryTarget.resetRecovery()));
+            } catch (CancellableThreads.ExecutionCancelledException e) {
+                // new recovery target is already cancelled (probably due to shard closing or recovery source changing)
+                assert onGoingRecoveries.containsKey(newRecoveryTarget.recoveryId()) == false;
+                logger.trace("{} recovery reset cancelled, recovery from {}, id [{}], previous id [{}]", newRecoveryTarget.shardId(),
+                    newRecoveryTarget.sourceNode(), newRecoveryTarget.recoveryId(), oldRecoveryTarget.recoveryId());
+                oldRecoveryTarget.cancel("recovery reset cancelled"); // if finalOldRecoveryTarget.resetRecovery did not even get to execute
+                return null;
+            }
             if (successfulReset.get() == false) {
                 cancelRecovery(newRecoveryTarget.recoveryId(), "failed to reset recovery");
                 return null;

+ 8 - 3
core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -184,9 +184,14 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
      */
     boolean resetRecovery() throws InterruptedException, IOException {
         if (finished.compareAndSet(false, true)) {
-            logger.debug("reset of recovery with shard {} and id [{}]", shardId, recoveryId);
-            // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
-            decRef();
+            try {
+                // yes, this is just a logger call in a try-finally block. The reason for this is that resetRecovery is called from
+                // CancellableThreads and we have to make sure that all references to IndexShard are cleaned up before exiting this method
+                logger.debug("reset of recovery with shard {} and id [{}]", shardId, recoveryId);
+            } finally {
+                // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now.
+                decRef();
+            }
             closedLatch.await();
             RecoveryState.Stage stage = indexShard.recoveryState().getStage();
             if (indexShard.recoveryState().getPrimary() && (stage == RecoveryState.Stage.FINALIZE || stage == RecoveryState.Stage.DONE)) {