Browse Source

Add get file chunk timeouts with listener timeouts (#38758)

This commit adds a `ListenerTimeouts` class that will wrap a
`ActionListener` in a listener with a timeout scheduled on the generic
thread pool. If the timeout expires before the listener is completed,
`onFailure` will be called with an `ElasticsearchTimeoutException`.

Timeouts for the get ccr file chunk action are implemented using this
functionality. Additionally, this commit attempts to fix #38027 by also
blocking proxied get ccr file chunk actions. This test being un-muted is
useful to verify the timeout functionality.
Tim Brooks 6 năm trước cách đây
mục cha
commit
07fd2610b2

+ 89 - 0
server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java

@@ -0,0 +1,89 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.support;
+
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.threadpool.Scheduler;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ListenerTimeouts {
+
+    /**
+     * Wraps a listener with a listener that can timeout. After the timeout period the
+     * {@link ActionListener#onFailure(Exception)} will be called with a
+     * {@link ElasticsearchTimeoutException} if the listener has not already been completed.
+     *
+     * @param threadPool used to schedule the timeout
+     * @param listener to that can timeout
+     * @param timeout period before listener failed
+     * @param executor to use for scheduling timeout
+     * @param listenerName name of the listener for timeout exception
+     * @return the wrapped listener that will timeout
+     */
+    public static <Response> ActionListener<Response> wrapWithTimeout(ThreadPool threadPool,  ActionListener<Response> listener,
+                                                                      TimeValue timeout, String executor, String listenerName) {
+        TimeoutableListener<Response> wrappedListener = new TimeoutableListener<>(listener, timeout, listenerName);
+        wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, executor);
+        return wrappedListener;
+    }
+
+    private static class TimeoutableListener<Response> implements ActionListener<Response>, Runnable {
+
+        private final AtomicBoolean isDone = new AtomicBoolean(false);
+        private final ActionListener<Response> delegate;
+        private final TimeValue timeout;
+        private final String listenerName;
+        private volatile Scheduler.ScheduledCancellable cancellable;
+
+        private TimeoutableListener(ActionListener<Response> delegate, TimeValue timeout, String listenerName) {
+            this.delegate = delegate;
+            this.timeout = timeout;
+            this.listenerName = listenerName;
+        }
+
+        @Override
+        public void onResponse(Response response) {
+            if (isDone.compareAndSet(false, true)) {
+                cancellable.cancel();
+                delegate.onResponse(response);
+            }
+        }
+
+        @Override
+        public void onFailure(Exception e) {
+            if (isDone.compareAndSet(false, true)) {
+                cancellable.cancel();
+                delegate.onFailure(e);
+            }
+        }
+
+        @Override
+        public void run() {
+            if (isDone.compareAndSet(false, true)) {
+                String timeoutMessage = "[" + listenerName + "]" + " timed out after [" + timeout + "]";
+                delegate.onFailure(new ElasticsearchTimeoutException(timeoutMessage));
+            }
+        }
+    }
+}

+ 120 - 0
server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java

@@ -0,0 +1,120 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.support;
+
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+public class ListenerTimeoutsTests extends ESTestCase {
+
+    private final TimeValue timeout = TimeValue.timeValueMillis(10);
+    private final String generic = ThreadPool.Names.GENERIC;
+    private DeterministicTaskQueue taskQueue;
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
+        taskQueue = new DeterministicTaskQueue(settings, random());
+    }
+
+    public void testListenerTimeout() {
+        AtomicBoolean success = new AtomicBoolean(false);
+        AtomicReference<Exception> exception = new AtomicReference<>();
+        ActionListener<Void> listener = wrap(success, exception);
+
+        ActionListener<Void> wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test");
+        assertTrue(taskQueue.hasDeferredTasks());
+        taskQueue.advanceTime();
+        taskQueue.runAllRunnableTasks();
+
+        wrapped.onResponse(null);
+        wrapped.onFailure(new IOException("incorrect exception"));
+
+        assertFalse(success.get());
+        assertThat(exception.get(), instanceOf(ElasticsearchTimeoutException.class));
+    }
+
+    public void testFinishNormallyBeforeTimeout() {
+        AtomicBoolean success = new AtomicBoolean(false);
+        AtomicReference<Exception> exception = new AtomicReference<>();
+        ActionListener<Void> listener = wrap(success, exception);
+
+        ActionListener<Void> wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test");
+        wrapped.onResponse(null);
+        wrapped.onFailure(new IOException("boom"));
+        wrapped.onResponse(null);
+
+        assertTrue(taskQueue.hasDeferredTasks());
+        taskQueue.advanceTime();
+        taskQueue.runAllRunnableTasks();
+
+        assertTrue(success.get());
+        assertNull(exception.get());
+    }
+
+    public void testFinishExceptionallyBeforeTimeout() {
+        AtomicBoolean success = new AtomicBoolean(false);
+        AtomicReference<Exception> exception = new AtomicReference<>();
+        ActionListener<Void> listener = wrap(success, exception);
+
+        ActionListener<Void> wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test");
+        wrapped.onFailure(new IOException("boom"));
+
+        assertTrue(taskQueue.hasDeferredTasks());
+        taskQueue.advanceTime();
+        taskQueue.runAllRunnableTasks();
+
+        assertFalse(success.get());
+        assertThat(exception.get(), instanceOf(IOException.class));
+    }
+
+    private ActionListener<Void> wrap(AtomicBoolean success, AtomicReference<Exception> exception) {
+        return new ActionListener<Void>() {
+
+            private final AtomicBoolean completed = new AtomicBoolean();
+
+            @Override
+            public void onResponse(Void aVoid) {
+                assertTrue(completed.compareAndSet(false, true));
+                assertTrue(success.compareAndSet(false, true));
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                assertTrue(completed.compareAndSet(false, true));
+                assertTrue(exception.compareAndSet(null, e));
+            }
+        };
+    }
+}

+ 9 - 5
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -16,6 +16,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.action.support.ListenerTimeouts;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -105,7 +106,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
     private final ThreadPool threadPool;
 
     private final CounterMetric throttledTime = new CounterMetric();
-    
+
     public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings,
                          CcrSettings ccrSettings, ThreadPool threadPool) {
         this.metadata = metadata;
@@ -389,7 +390,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws IOException {
             logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
 
-            try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {})) {
+            try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {
+            })) {
                 final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
                 final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
 
@@ -444,8 +446,9 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                             logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId,
                                 fileToRecover.name(), fileSession.lastOffset, bytesRequested);
 
-                            remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request,
-                                ActionListener.wrap(
+                            TimeValue timeout = ccrSettings.getRecoveryActionTimeout();
+                            ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> listener =
+                                ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap(
                                     r -> threadPool.generic().execute(new AbstractRunnable() {
                                         @Override
                                         public void onFailure(Exception e) {
@@ -491,7 +494,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                                         error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
                                         requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
                                     }
-                                ));
+                                    ), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME);
+                            remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener);
                         } catch (Exception e) {
                             error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
                             requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);

+ 27 - 25
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

@@ -37,6 +37,7 @@ import org.elasticsearch.snapshots.RestoreInfo;
 import org.elasticsearch.snapshots.RestoreService;
 import org.elasticsearch.snapshots.Snapshot;
 import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.TransportActionProxy;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.CcrIntegTestCase;
 import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction;
@@ -292,7 +293,6 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38027")
     public void testIndividualActionsTimeout() throws Exception {
         ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
         TimeValue timeValue = TimeValue.timeValueMillis(100);
@@ -315,7 +315,8 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
             MockTransportService mockTransportService = (MockTransportService) transportService;
             transportServices.add(mockTransportService);
             mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
-                if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false) {
+                if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false &&
+                    action.equals(TransportActionProxy.getProxyAction(GetCcrRestoreFileChunkAction.NAME)) == false) {
                     connection.sendRequest(requestId, action, request, options);
                 }
             });
@@ -337,33 +338,34 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
             .renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
             .indexSettings(settingsBuilder);
 
-        final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
-        final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
-        PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
-        restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
-
-        // Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching
-        // metadata this will throw an exception. If it times-out when restoring a shard, the shard will
-        // be marked as failed. Either one is a success for the purpose of this test.
         try {
-            RestoreInfo restoreInfo = future.actionGet();
-            assertThat(restoreInfo.failedShards(), greaterThan(0));
-            assertThat(restoreInfo.successfulShards(), lessThan(restoreInfo.totalShards()));
-            assertEquals(numberOfPrimaryShards, restoreInfo.totalShards());
-        } catch (Exception e) {
-            assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class));
-        }
+            final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
+            final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
+            PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
+            restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
 
+            // Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching
+            // metadata this will throw an exception. If it times-out when restoring a shard, the shard will
+            // be marked as failed. Either one is a success for the purpose of this test.
+            try {
+                RestoreInfo restoreInfo = future.actionGet();
+                assertThat(restoreInfo.failedShards(), greaterThan(0));
+                assertThat(restoreInfo.successfulShards(), lessThan(restoreInfo.totalShards()));
+                assertEquals(numberOfPrimaryShards, restoreInfo.totalShards());
+            } catch (Exception e) {
+                assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class));
+            }
+        } finally {
+            for (MockTransportService transportService : transportServices) {
+                transportService.clearAllRules();
+            }
 
-        for (MockTransportService transportService : transportServices) {
-            transportService.clearAllRules();
+            settingsRequest = new ClusterUpdateSettingsRequest();
+            TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY);
+            settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(),
+                defaultValue));
+            assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
         }
-
-        settingsRequest = new ClusterUpdateSettingsRequest();
-        TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY);
-        settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(),
-            defaultValue));
-        assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
     }
 
     public void testFollowerMappingIsUpdated() throws IOException {