Browse Source

Remove needless forking in TransportPutFollowAction (#91405)

Now that https://github.com/elastic/elasticsearch/pull/87235 makes the
Ccr repository non-blocking, there's no need to fork in this action just
like we don't fork in a normal restore operation.
Armin Braun 2 years ago
parent
commit
7ce46dcdb3

+ 35 - 48
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

@@ -29,7 +29,6 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.license.LicenseUtils;
@@ -192,56 +191,44 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction<Pu
             threadPool.getThreadContext().getHeaders(),
             clusterService.state()
         );
-        threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
-
-            @Override
-            public void onFailure(Exception e) {
-                listener.onFailure(e);
-            }
+        ActionListener<RestoreService.RestoreCompletionResponse> delegatelistener = listener.delegateFailure(
+            (delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response)
+        );
+        if (remoteDataStream == null) {
+            // If the index we're following is not part of a data stream, start the
+            // restoration of the index normally.
+            restoreService.restoreSnapshot(restoreRequest, delegatelistener);
+        } else {
+            String followerIndexName = request.getFollowerIndex();
+            // This method is used to update the metadata in the same cluster state
+            // update as the snapshot is restored.
+            BiConsumer<ClusterState, Metadata.Builder> updater = (currentState, mdBuilder) -> {
+                final String localDataStreamName;
 
-            @Override
-            protected void doRun() {
-                ActionListener<RestoreService.RestoreCompletionResponse> delegatelistener = listener.delegateFailure(
-                    (delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response)
-                );
-                if (remoteDataStream == null) {
-                    // If the index we're following is not part of a data stream, start the
-                    // restoration of the index normally.
-                    restoreService.restoreSnapshot(restoreRequest, delegatelistener);
+                // If we have been given a data stream name, use that name for the local
+                // data stream. See the javadoc for AUTO_FOLLOW_PATTERN_REPLACEMENT
+                // for more info.
+                final String dsName = request.getDataStreamName();
+                if (Strings.hasText(dsName)) {
+                    localDataStreamName = dsName;
                 } else {
-                    String followerIndexName = request.getFollowerIndex();
-                    // This method is used to update the metadata in the same cluster state
-                    // update as the snapshot is restored.
-                    BiConsumer<ClusterState, Metadata.Builder> updater = (currentState, mdBuilder) -> {
-                        final String localDataStreamName;
-
-                        // If we have been given a data stream name, use that name for the local
-                        // data stream. See the javadoc for AUTO_FOLLOW_PATTERN_REPLACEMENT
-                        // for more info.
-                        final String dsName = request.getDataStreamName();
-                        if (Strings.hasText(dsName)) {
-                            localDataStreamName = dsName;
-                        } else {
-                            // There was no specified name, use the original data stream name.
-                            localDataStreamName = remoteDataStream.getName();
-                        }
-                        final DataStream localDataStream = mdBuilder.dataStreamMetadata().dataStreams().get(localDataStreamName);
-                        final Index followerIndex = mdBuilder.get(followerIndexName).getIndex();
-                        assert followerIndex != null
-                            : "expected followerIndex " + followerIndexName + " to exist in the state, but it did not";
-
-                        final DataStream updatedDataStream = updateLocalDataStream(
-                            followerIndex,
-                            localDataStream,
-                            localDataStreamName,
-                            remoteDataStream
-                        );
-                        mdBuilder.put(updatedDataStream);
-                    };
-                    restoreService.restoreSnapshot(restoreRequest, delegatelistener, updater);
+                    // There was no specified name, use the original data stream name.
+                    localDataStreamName = remoteDataStream.getName();
                 }
-            }
-        });
+                final DataStream localDataStream = mdBuilder.dataStreamMetadata().dataStreams().get(localDataStreamName);
+                final Index followerIndex = mdBuilder.get(followerIndexName).getIndex();
+                assert followerIndex != null : "expected followerIndex " + followerIndexName + " to exist in the state, but it did not";
+
+                final DataStream updatedDataStream = updateLocalDataStream(
+                    followerIndex,
+                    localDataStream,
+                    localDataStreamName,
+                    remoteDataStream
+                );
+                mdBuilder.put(updatedDataStream);
+            };
+            restoreService.restoreSnapshot(restoreRequest, delegatelistener, updater);
+        }
     }
 
     private void afterRestoreStarted(