Browse Source

Never release store using CancellableThreads (#45409)

Today we can release a Store using CancellableThreads. If we are holding
the last reference, then we will verify the node lock before deleting
the store. Checking node lock performs some I/O on FileChannel. If the
current thread is interrupted, then the channel will be closed and the
node lock will also be invalid.

Closes #45237
Nhat Nguyen 6 years ago
parent
commit
0fb695e2e4

+ 5 - 0
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

@@ -112,6 +112,11 @@ public class PeerRecoverySourceService implements IndexEventListener {
         }
     }
 
+    // exposed for testing
+    final int numberOfOngoingRecoveries() {
+        return ongoingRecoveries.ongoingRecoveries.size();
+    }
+
     final class OngoingRecoveries {
         private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
 

+ 22 - 2
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -33,7 +33,9 @@ import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.StepListener;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -230,8 +232,7 @@ public class RecoverySourceHandler {
 
                 try {
                     final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
-                    shard.store().incRef();
-                    final Releasable releaseStore = Releasables.releaseOnce(shard.store()::decRef);
+                    final Releasable releaseStore = acquireStore(shard.store());
                     resources.add(releaseStore);
                     sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
                         try {
@@ -393,6 +394,25 @@ public class RecoverySourceHandler {
         });
     }
 
+    /**
+     * Increases the store reference and returns a {@link Releasable} that will decrease the store reference using the generic thread pool.
+     * We must never release the store using an interruptible thread as we can risk invalidating the node lock.
+     */
+    private Releasable acquireStore(Store store) {
+        store.incRef();
+        return Releasables.releaseOnce(() -> {
+            final PlainActionFuture<Void> future = new PlainActionFuture<>();
+            threadPool.generic().execute(new ActionRunnable<>(future) {
+                @Override
+                protected void doRun() {
+                    store.decRef();
+                    listener.onResponse(null);
+                }
+            });
+            FutureUtils.get(future);
+        });
+    }
+
     static final class SendFileResult {
         final List<String> phase1FileNames;
         final List<Long> phase1FileSizes;

+ 0 - 1
server/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java

@@ -95,7 +95,6 @@ public class SimpleDataNodesIT extends ESIntegTestCase {
             equalTo(false));
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45237")
     public void testAutoExpandReplicasAdjustedWhenDataNodeJoins() {
         internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
         client().admin().indices().create(createIndexRequest("test")

+ 17 - 0
server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

@@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -1486,4 +1487,20 @@ public class IndexRecoveryIT extends ESIntegTestCase {
         }
         ensureGreen(indexName);
     }
+
+    public void testCancelRecoveryWithAutoExpandReplicas() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        assertAcked(client().admin().indices().prepareCreate("test")
+            .setSettings(Settings.builder().put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all"))
+            .setWaitForActiveShards(ActiveShardCount.NONE));
+        internalCluster().startNode();
+        internalCluster().startNode();
+        client().admin().cluster().prepareReroute().setRetryFailed(true).get();
+        assertAcked(client().admin().indices().prepareDelete("test")); // cancel recoveries
+        assertBusy(() -> {
+            for (PeerRecoverySourceService recoveryService : internalCluster().getDataNodeInstances(PeerRecoverySourceService.class)) {
+                assertThat(recoveryService.numberOfOngoingRecoveries(), equalTo(0));
+            }
+        });
+    }
 }