Browse Source

Deduplicate Heavy CCR Repository CS Requests (#91398)

We run the same request back to back for each put-follower call during
the restore. Also, concurrent put-follower calls will all run the same
full CS request concurrently.
In older versions prior to https://github.com/elastic/elasticsearch/pull/87235
the concurrency was limited by the size of the snapshot pool. With that
fix though, they are run at almost arbitry concurrency when many
put-follow requests are executed concurrently.
-> fixed by using the existing deduplicator to only run a single remote
CS request at a time for each CCR repository.
Also, this removes the needless forking in the put-follower action that
is not necessary any longer now that we have the CCR repository
non-blocking (we do the same for normal restores that can safely be
started from a transport thread), which should fix some bad-ux
situations where the snapshot threads are busy on master, making
the put-follower requests not go through in time.
Armin Braun 2 years ago
parent
commit
d1c5ca257e

+ 5 - 0
docs/changelog/91398.yaml

@@ -0,0 +1,5 @@
+pr: 91398
+summary: Deduplicate Heavy CCR Repository CS Requests
+area: CCR
+type: bug
+issues: []

+ 98 - 0
server/src/main/java/org/elasticsearch/action/SingleResultDeduplicator.java

@@ -0,0 +1,98 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action;
+
+import org.elasticsearch.action.support.ContextPreservingActionListener;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ *
+ * Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can
+ * share the result from a single call to the wrapped action. This implementation is similar to {@link ResultDeduplicator} but offers
+ * stronger guarantees of not seeing a stale result ever. Concretely, every invocation of {@link #execute(ActionListener)} is guaranteed to
+ * be resolved with a response that has been computed at a time after the call to {@code execute} has been made. This allows this class to
+ * be used to deduplicate results from actions that produce results that change over time transparently.
+ *
+ * @param <T> Result type
+ */
+public final class SingleResultDeduplicator<T> {
+
+    private final ThreadContext threadContext;
+
+    /**
+     * List of listeners waiting for the execution after the current in-progress execution. If {@code null} then no execution is in
+     * progress currently, otherwise an execution is in progress and will trigger another execution that will resolve any listeners queued
+     * up here once done.
+     */
+    private List<ActionListener<T>> waitingListeners;
+
+    private final Consumer<ActionListener<T>> executeAction;
+
+    public SingleResultDeduplicator(ThreadContext threadContext, Consumer<ActionListener<T>> executeAction) {
+        this.threadContext = threadContext;
+        this.executeAction = executeAction;
+    }
+
+    /**
+     * Execute the action for the given {@code listener}.
+     * @param listener listener to resolve with execution result
+     */
+    public void execute(ActionListener<T> listener) {
+        synchronized (this) {
+            if (waitingListeners == null) {
+                // no queued up listeners, just execute this one directly without deduplication and instantiate the list so that
+                // subsequent executions will wait
+                waitingListeners = new ArrayList<>();
+            } else {
+                // already running an execution, queue this one up
+                waitingListeners.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
+                return;
+            }
+        }
+        doExecute(listener);
+    }
+
+    private void doExecute(ActionListener<T> listener) {
+        final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
+            final List<ActionListener<T>> listeners;
+            synchronized (this) {
+                if (waitingListeners.isEmpty()) {
+                    // no listeners were queued up while this execution ran, so we just reset the state to not having a running execution
+                    waitingListeners = null;
+                    return;
+                } else {
+                    // we have queued up listeners, so we create a fresh list for the next execution and execute once to handle the
+                    // listeners currently queued up
+                    listeners = waitingListeners;
+                    waitingListeners = new ArrayList<>();
+                }
+            }
+            doExecute(new ActionListener<>() {
+                @Override
+                public void onResponse(T response) {
+                    ActionListener.onResponse(listeners, response);
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    ActionListener.onFailure(listeners, e);
+                }
+            });
+        });
+        try {
+            executeAction.accept(wrappedListener);
+        } catch (Exception e) {
+            wrappedListener.onFailure(e);
+        }
+    }
+}

+ 77 - 0
server/src/test/java/org/elasticsearch/transport/SingleResultDeduplicatorTests.java

@@ -0,0 +1,77 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.transport;
+
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.SingleResultDeduplicator;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.test.ESTestCase;
+
+public class SingleResultDeduplicatorTests extends ESTestCase {
+
+    public void testDeduplicatesWithoutShowingStaleData() {
+        final SetOnce<ActionListener<Object>> firstListenerRef = new SetOnce<>();
+        final SetOnce<ActionListener<Object>> secondListenerRef = new SetOnce<>();
+        final var deduplicator = new SingleResultDeduplicator<>(new ThreadContext(Settings.EMPTY), l -> {
+            if (firstListenerRef.trySet(l) == false) {
+                secondListenerRef.set(l);
+            }
+        });
+        final Object result1 = new Object();
+        final Object result2 = new Object();
+
+        final int totalListeners = randomIntBetween(2, 10);
+        final boolean[] called = new boolean[totalListeners];
+        deduplicator.execute(new ActionListener<>() {
+            @Override
+            public void onResponse(Object response) {
+                assertFalse(called[0]);
+                called[0] = true;
+                assertEquals(result1, response);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                throw new AssertionError(e);
+            }
+        });
+
+        for (int i = 1; i < totalListeners; i++) {
+            final int index = i;
+            deduplicator.execute(new ActionListener<>() {
+
+                @Override
+                public void onResponse(Object response) {
+                    assertFalse(called[index]);
+                    called[index] = true;
+                    assertEquals(result2, response);
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    throw new AssertionError(e);
+                }
+            });
+        }
+        for (int i = 0; i < totalListeners; i++) {
+            assertFalse(called[i]);
+        }
+        firstListenerRef.get().onResponse(result1);
+        assertTrue(called[0]);
+        for (int i = 1; i < totalListeners; i++) {
+            assertFalse(called[i]);
+        }
+        secondListenerRef.get().onResponse(result2);
+        for (int i = 0; i < totalListeners; i++) {
+            assertTrue(called[i]);
+        }
+    }
+}

+ 24 - 13
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -16,6 +16,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionRunnable;
+import org.elasticsearch.action.SingleResultDeduplicator;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
@@ -137,6 +138,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
 
     private final CounterMetric throttledTime = new CounterMetric();
 
+    private final SingleResultDeduplicator<ClusterState> csDeduplicator;
+
     public CcrRepository(RepositoryMetadata metadata, Client client, Settings settings, CcrSettings ccrSettings, ThreadPool threadPool) {
         this.metadata = metadata;
         this.ccrSettings = ccrSettings;
@@ -145,6 +148,17 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1];
         this.client = client;
         this.threadPool = threadPool;
+        csDeduplicator = new SingleResultDeduplicator<>(
+            threadPool.getThreadContext(),
+            l -> getRemoteClusterClient().admin()
+                .cluster()
+                .prepareState()
+                .clear()
+                .setMetadata(true)
+                .setNodes(true)
+                .setMasterNodeTimeout(TimeValue.MAX_VALUE)
+                .execute(l.map(ClusterStateResponse::getState))
+        );
     }
 
     @Override
@@ -177,26 +191,22 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         assert snapshotIds.size() == 1 && SNAPSHOT_ID.equals(snapshotIds.iterator().next())
             : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId but saw " + snapshotIds;
         try {
-            getRemoteClusterClient().admin()
-                .cluster()
-                .prepareState()
-                .clear()
-                .setMetadata(true)
-                .setNodes(true)
-                // fork to the snapshot meta pool because the context expects to run on it and asserts that it does
-                .execute(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SNAPSHOT_META, context.map(response -> {
-                    Metadata responseMetadata = response.getState().metadata();
+            csDeduplicator.execute(
+                new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SNAPSHOT_META, context.map(response -> {
+                    Metadata responseMetadata = response.metadata();
                     Map<String, IndexMetadata> indicesMap = responseMetadata.indices();
                     return new SnapshotInfo(
                         new Snapshot(this.metadata.name(), SNAPSHOT_ID),
                         List.copyOf(indicesMap.keySet()),
                         List.copyOf(responseMetadata.dataStreams().keySet()),
                         List.of(),
-                        response.getState().getNodes().getMaxNodeVersion(),
+                        response.getNodes().getMaxNodeVersion(),
                         SnapshotState.SUCCESS
                     );
-                }), false));
+                }), false)
+            );
         } catch (Exception e) {
+            assert false : e;
             context.onFailure(e);
         }
     }
@@ -255,8 +265,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
     @Override
     public void getRepositoryData(ActionListener<RepositoryData> listener) {
         try {
-            getRemoteClusterClient().admin().cluster().prepareState().clear().setMetadata(true).execute(listener.map(response -> {
-                final Metadata remoteMetadata = response.getState().getMetadata();
+            csDeduplicator.execute(listener.map(response -> {
+                final Metadata remoteMetadata = response.getMetadata();
                 final String[] concreteAllIndices = remoteMetadata.getConcreteAllIndices();
                 final Map<String, SnapshotId> copiedSnapshotIds = Maps.newMapWithExpectedSize(concreteAllIndices.length);
                 final Map<String, RepositoryData.SnapshotDetails> snapshotsDetails = Maps.newMapWithExpectedSize(concreteAllIndices.length);
@@ -285,6 +295,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                 );
             }));
         } catch (Exception e) {
+            assert false;
             listener.onFailure(e);
         }
     }

+ 16 - 14
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRepositoryRetentionLeaseTests.java

@@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.seqno.RetentionLeaseActions;
 import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
@@ -54,13 +55,7 @@ public class CcrRepositoryRetentionLeaseTests extends ESTestCase {
             CcrSettings.getSettings().stream().filter(Setting::hasNodeScope)
         ).collect(Collectors.toSet());
 
-        final CcrRepository repository = new CcrRepository(
-            repositoryMetadata,
-            mock(Client.class),
-            Settings.EMPTY,
-            new CcrSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, settings)),
-            mock(ThreadPool.class)
-        );
+        final CcrRepository repository = createCcrRepository(repositoryMetadata, settings);
 
         final ShardId followerShardId = new ShardId(new Index("follower-index-name", "follower-index-uuid"), 0);
         final ShardId leaderShardId = new ShardId(new Index("leader-index-name", "leader-index-uuid"), 0);
@@ -110,6 +105,19 @@ public class CcrRepositoryRetentionLeaseTests extends ESTestCase {
         verifyNoMoreInteractions(remoteClient);
     }
 
+    private static CcrRepository createCcrRepository(RepositoryMetadata repositoryMetadata, Set<Setting<?>> settings) {
+        final ThreadPool threadPool = mock(ThreadPool.class);
+        final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+        when(threadPool.getThreadContext()).thenReturn(threadContext);
+        return new CcrRepository(
+            repositoryMetadata,
+            mock(Client.class),
+            Settings.EMPTY,
+            new CcrSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, settings)),
+            threadPool
+        );
+    }
+
     public void testWhenRetentionLeaseExpiresBeforeWeCanRenewIt() {
         final RepositoryMetadata repositoryMetadata = mock(RepositoryMetadata.class);
         when(repositoryMetadata.name()).thenReturn(CcrRepository.NAME_PREFIX);
@@ -118,13 +126,7 @@ public class CcrRepositoryRetentionLeaseTests extends ESTestCase {
             CcrSettings.getSettings().stream().filter(Setting::hasNodeScope)
         ).collect(Collectors.toSet());
 
-        final CcrRepository repository = new CcrRepository(
-            repositoryMetadata,
-            mock(Client.class),
-            Settings.EMPTY,
-            new CcrSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, settings)),
-            mock(ThreadPool.class)
-        );
+        final CcrRepository repository = createCcrRepository(repositoryMetadata, settings);
 
         final ShardId followerShardId = new ShardId(new Index("follower-index-name", "follower-index-uuid"), 0);
         final ShardId leaderShardId = new ShardId(new Index("leader-index-name", "leader-index-uuid"), 0);