Browse Source

Interpret `?timeout=-1` as infinite ack timeout (#107675)

APIs which perform cluster state updates typically accept the
`?master_timeout=` and `?timeout=` parameters to respectively set the
pending task queue timeout and the acking timeout for the cluster state
update. Both of these parameters accept the value `-1`, but
`?master_timeout=-1` means to wait indefinitely whereas `?timeout=-1`
means the same thing as `?timeout=0`, namely that acking times out
immediately on commit.

There are some situations where it makes sense to wait for as long as
possible for nodes to ack a cluster state update. In practice this wait
is bounded by other mechanisms (e.g. the lag detector will remove the
node from the cluster after a couple of minutes of failing to apply
cluster state updates) but these are not really the concern of clients.

Therefore with this commit we change the meaning of `?timeout=-1` to
mean that the acking timeout is infinite.
David Turner 1 year ago
parent
commit
fc287bde8b

+ 17 - 0
docs/changelog/107675.yaml

@@ -0,0 +1,17 @@
+pr: 107675
+summary: Interpret `?timeout=-1` as infinite ack timeout
+area: Cluster Coordination
+type: breaking
+issues: []
+breaking:
+  title: Interpret `?timeout=-1` as infinite ack timeout
+  area: REST API
+  details: |
+    Today {es} accepts the parameter `?timeout=-1` in many APIs, but interprets
+    this to mean the same as `?timeout=0`. From 8.15 onwards `?timeout=-1` will
+    mean to wait indefinitely, aligning the behaviour of this parameter with
+    other similar parameters such as `?master_timeout`.
+  impact: |
+    Use `?timeout=0` to force relevant operations to time out immediately
+    instead of `?timeout=-1`
+  notable: false

+ 5 - 4
docs/reference/rest-api/common-parms.asciidoc

@@ -1223,12 +1223,13 @@ the timeout expires, the request fails and returns an error. Defaults to `30s`.
 Can also be set to `-1` to indicate that the request should never timeout.
 end::master-timeout[]
 
-tag::timeout[]
 `timeout`::
 (Optional, <<time-units, time units>>)
-Period to wait for a response. If no response is received before the timeout
-expires, the request fails and returns an error. Defaults to `30s`.
-end::timeout[]
+Period to wait for a response from all relevant nodes in the cluster after
+updating the cluster metadata. If no response is received before the timeout
+expires, the cluster metadata update still applies but the response will
+indicate that it was not completely acknowledged. Defaults to `30s`.
+Can also be set to `-1` to indicate that the request should never timeout.
 end::timeoutparms[]
 
 tag::transform-id[]

+ 1 - 4
modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/UpdateDataStreamGlobalRetentionService.java

@@ -87,12 +87,9 @@ public class UpdateDataStreamGlobalRetentionService {
         List<UpdateDataStreamGlobalRetentionResponse.AffectedDataStream> affectedDataStreams,
         final ActionListener<UpdateDataStreamGlobalRetentionResponse> listener
     ) {
-        final var ackTimeout = request.masterNodeTimeout().millis() < 0 ? TimeValue.MAX_VALUE : request.masterNodeTimeout();
-        // NB a negative master node timeout means never to time out, but a negative ack timeout means to time out immediately.
-        // TODO when https://github.com/elastic/elasticsearch/issues/107044 is fixed, we can just use request.masterNodeTimeout() directly
         taskQueue.submitTask(
             "remove-data-stream-global-retention",
-            new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, ackTimeout),
+            new UpsertGlobalDataStreamMetadataTask(null, affectedDataStreams, listener, request.masterNodeTimeout()),
             request.masterNodeTimeout()
         );
     }

+ 54 - 0
server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java

@@ -8,20 +8,28 @@
 
 package org.elasticsearch.action.admin.indices.create;
 
+import io.netty.handler.codec.http.HttpMethod;
+
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.UnavailableShardsException;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.Response;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexService;
@@ -31,17 +39,24 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
 import org.elasticsearch.test.ESIntegTestCase.Scope;
+import org.elasticsearch.test.rest.ESRestTestCase;
 import org.elasticsearch.xcontent.XContentFactory;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 
+import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS;
+import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
+import static org.elasticsearch.test.rest.ESRestTestCase.entityAsMap;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -53,6 +68,11 @@ import static org.hamcrest.core.IsNull.notNullValue;
 @ClusterScope(scope = Scope.TEST)
 public class CreateIndexIT extends ESIntegTestCase {
 
+    @Override
+    protected boolean addMockHttpTransport() {
+        return false; // expose HTTP requests
+    }
+
     public void testCreationDateGivenFails() {
         try {
             prepareCreate("test").setSettings(Settings.builder().put(IndexMetadata.SETTING_CREATION_DATE, 4L)).get();
@@ -370,4 +390,38 @@ public class CreateIndexIT extends ESIntegTestCase {
         assertEquals("Should have index name in response", "foo", response.index());
     }
 
+    public void testInfiniteAckTimeout() throws IOException {
+        final var clusterService = internalCluster().getInstance(ClusterService.class);
+        final var barrier = new CyclicBarrier(2);
+        clusterService.getClusterApplierService().runOnApplierThread("block for test", Priority.NORMAL, cs -> {
+            safeAwait(barrier);
+            safeAwait(barrier);
+        }, ActionListener.noop());
+
+        safeAwait(barrier);
+
+        final var request = ESRestTestCase.newXContentRequest(
+            HttpMethod.PUT,
+            "testindex",
+            (builder, params) -> builder.startObject("settings")
+                .field(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                .field(SETTING_NUMBER_OF_REPLICAS, internalCluster().numDataNodes() - 1)
+                .endObject()
+        );
+        request.addParameter("timeout", "-1");
+        final var responseFuture = new PlainActionFuture<Response>();
+        getRestClient().performRequestAsync(request, ActionTestUtils.wrapAsRestResponseListener(responseFuture));
+
+        if (randomBoolean()) {
+            safeSleep(scaledRandomIntBetween(1, 100));
+        }
+
+        assertFalse(responseFuture.isDone());
+        safeAwait(barrier);
+
+        final var response = FutureUtils.get(responseFuture, 10, TimeUnit.SECONDS);
+        assertEquals(200, response.getStatusLine().getStatusCode());
+        assertTrue((boolean) extractValue("acknowledged", entityAsMap(response)));
+    }
+
 }

+ 8 - 0
server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

@@ -711,6 +711,14 @@ public class MasterService extends AbstractLifecycleComponent {
                 assert false : "ackTimeout must always be present: " + contextPreservingAckListener;
                 ackTimeout = TimeValue.ZERO;
             }
+
+            if (ackTimeout.millis() < 0) {
+                if (countDown.countDown()) {
+                    finish();
+                }
+                return;
+            }
+
             final TimeValue timeLeft = TimeValue.timeValueNanos(Math.max(0, ackTimeout.nanos() - commitTime.nanos()));
             if (timeLeft.nanos() == 0L) {
                 onTimeout();

+ 51 - 0
server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

@@ -1663,6 +1663,57 @@ public class MasterServiceTests extends ESTestCase {
                 deterministicTaskQueue.runAllTasksInTimeOrder();
                 safeAwait(latch);
             }
+
+            // check that -1 means an infinite ack timeout
+            {
+                final CountDownLatch latch = new CountDownLatch(2);
+
+                publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> {
+                    publishListener.onResponse(null);
+                    ackListener.onCommit(TimeValue.timeValueMillis(randomLongBetween(0, TimeValue.timeValueDays(1).millis())));
+                    for (final var node : new DiscoveryNode[] { node1, node2, node3 }) {
+                        deterministicTaskQueue.scheduleAt(
+                            deterministicTaskQueue.getCurrentTimeMillis() + randomLongBetween(0, TimeValue.timeValueDays(1).millis()),
+                            () -> ackListener.onNodeAck(node, null)
+                        );
+                    }
+                });
+
+                masterService.submitUnbatchedStateUpdateTask(
+                    "test2",
+                    new AckedClusterStateUpdateTask(ackedRequest(TimeValue.MINUS_ONE, null), null) {
+                        @Override
+                        public ClusterState execute(ClusterState currentState) {
+                            return ClusterState.builder(currentState).build();
+                        }
+
+                        @Override
+                        public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
+                            latch.countDown();
+                        }
+
+                        @Override
+                        protected AcknowledgedResponse newResponse(boolean acknowledged) {
+                            assertTrue(acknowledged);
+                            latch.countDown();
+                            return AcknowledgedResponse.TRUE;
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            fail();
+                        }
+
+                        @Override
+                        public void onAckTimeout() {
+                            fail();
+                        }
+                    }
+                );
+
+                deterministicTaskQueue.runAllTasks(); // NB not in time order, there's no timeout to avoid
+                safeAwait(latch);
+            }
         }
     }