Browse Source

Fork after calling getRepositoryData from StoreRecovery (#87254)

`Repository#getRepositoryData` will occasionally complete its listener
on a blocking-free thread (a transport worker or the cluster applier
thread) so it's not valid to call `Repository#restoreShard` directly.
This commit fixes this by using a forking listener.

Closes #87237
David Turner 3 years ago
parent
commit
1c48b04b25

+ 6 - 0
docs/changelog/87254.yaml

@@ -0,0 +1,6 @@
+pr: 87254
+summary: Fork after calling `getRepositoryData` from `StoreRecovery`
+area: Snapshot/Restore
+type: bug
+issues:
+ - 87237

+ 17 - 6
server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

@@ -22,6 +22,7 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.StepListener;
+import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.cluster.routing.RecoverySource;
@@ -42,6 +43,7 @@ import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -529,22 +531,31 @@ public final class StoreRecovery {
             // If the index UUID was not found in the recovery source we will have to load RepositoryData and resolve it by index name
             if (indexId.getId().equals(IndexMetadata.INDEX_UUID_NA_VALUE)) {
                 // BwC path, running against an old version master that did not add the IndexId to the recovery source
-                repository.getRepositoryData(indexIdListener.map(repositoryData -> repositoryData.resolveIndexId(indexId.getName())));
+                repository.getRepositoryData(
+                    new ThreadedActionListener<>(
+                        logger,
+                        indexShard.getThreadPool(),
+                        ThreadPool.Names.GENERIC,
+                        indexIdListener.map(repositoryData -> repositoryData.resolveIndexId(indexId.getName())),
+                        false
+                    )
+                );
             } else {
                 indexIdListener.onResponse(indexId);
             }
             assert indexShard.getEngineOrNull() == null;
-            indexIdListener.whenComplete(
-                idx -> repository.restoreShard(
+            indexIdListener.whenComplete(idx -> {
+                assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']')
+                    || Thread.currentThread().getName().startsWith("TEST-") : Thread.currentThread().getName();
+                repository.restoreShard(
                     indexShard.store(),
                     restoreSource.snapshot().getSnapshotId(),
                     idx,
                     snapshotShardId,
                     indexShard.recoveryState(),
                     restoreListener
-                ),
-                restoreListener::onFailure
-            );
+                );
+            }, restoreListener::onFailure);
         } catch (Exception e) {
             restoreListener.onFailure(e);
         }