Browse Source

Chunked encoding for cluster reroute API (#92615)

The cluster reroute API (optionally) returns the cluster state in its
response, which can therefore be rather large. #92285 enables a chunked
encoding of the cluster state, and this commit adjusts the reroute API
to make use of this encoding too.
David Turner 2 years ago
parent
commit
87f2221aa0

+ 45 - 19
server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java

@@ -8,26 +8,32 @@
 
 package org.elasticsearch.action.admin.cluster.reroute;
 
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.support.master.IsAcknowledgedSupplier;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.logging.DeprecationCategory;
 import org.elasticsearch.common.logging.DeprecationLogger;
-import org.elasticsearch.common.xcontent.ChunkedToXContent;
+import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
+import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.rest.action.search.RestSearchAction;
-import org.elasticsearch.xcontent.ToXContentObject;
-import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.ToXContent;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.Objects;
 
+import static org.elasticsearch.action.support.master.AcknowledgedResponse.ACKNOWLEDGED_KEY;
+
 /**
  * Response returned after a cluster reroute request
  */
-public class ClusterRerouteResponse extends AcknowledgedResponse implements ToXContentObject {
+public class ClusterRerouteResponse extends ActionResponse implements IsAcknowledgedSupplier, ChunkedToXContentObject {
 
     private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestSearchAction.class);
     public static final String STATE_FIELD_DEPRECATION_MESSAGE = "The [state] field in the response to the reroute API is deprecated "
@@ -38,15 +44,17 @@ public class ClusterRerouteResponse extends AcknowledgedResponse implements ToXC
      */
     private final ClusterState state;
     private final RoutingExplanations explanations;
+    private final boolean acknowledged;
 
     ClusterRerouteResponse(StreamInput in) throws IOException {
         super(in);
+        acknowledged = in.readBoolean();
         state = ClusterState.readFrom(in, null);
         explanations = RoutingExplanations.readFrom(in);
     }
 
     ClusterRerouteResponse(boolean acknowledged, ClusterState state, RoutingExplanations explanations) {
-        super(acknowledged);
+        this.acknowledged = acknowledged;
         this.state = state;
         this.explanations = explanations;
     }
@@ -62,27 +70,45 @@ public class ClusterRerouteResponse extends AcknowledgedResponse implements ToXC
         return this.explanations;
     }
 
+    @Override
+    public final boolean isAcknowledged() {
+        return acknowledged;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        super.writeTo(out);
+        out.writeBoolean(acknowledged);
         state.writeTo(out);
         RoutingExplanations.writeTo(explanations, out);
     }
 
+    private boolean emitState(ToXContent.Params params) {
+        return Objects.equals(params.param("metric"), "none") == false;
+    }
+
     @Override
-    protected void addCustomFields(XContentBuilder builder, Params params) throws IOException {
-        if (Objects.equals(params.param("metric"), "none") == false) {
-            if (builder.getRestApiVersion() != RestApiVersion.V_7) {
-                deprecationLogger.critical(DeprecationCategory.API, "reroute_cluster_state", STATE_FIELD_DEPRECATION_MESSAGE);
-            }
-            builder.startObject("state");
-            // TODO this should be chunked, see #89838
-            ChunkedToXContent.wrapAsToXContent(state).toXContent(builder, params);
-            builder.endObject();
+    public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
+        if (emitState(outerParams)) {
+            deprecationLogger.critical(DeprecationCategory.API, "reroute_cluster_state", STATE_FIELD_DEPRECATION_MESSAGE);
         }
+        return toXContentChunkedV7(outerParams);
+    }
 
-        if (params.paramAsBoolean("explain", false)) {
-            explanations.toXContent(builder, params);
-        }
+    @Override
+    public Iterator<? extends ToXContent> toXContentChunkedV7(ToXContent.Params outerParams) {
+        return Iterators.concat(
+            Iterators.single((builder, params) -> builder.startObject().field(ACKNOWLEDGED_KEY, isAcknowledged())),
+            emitState(outerParams)
+                ? ChunkedToXContentHelper.wrapWithObject("state", state.toXContentChunked(outerParams))
+                : Collections.emptyIterator(),
+            Iterators.single((builder, params) -> {
+                if (params.paramAsBoolean("explain", false)) {
+                    explanations.toXContent(builder, params);
+                }
+
+                builder.endObject();
+                return builder;
+            })
+        );
     }
 }

+ 2 - 1
server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequestBuilder.java

@@ -7,6 +7,7 @@
  */
 package org.elasticsearch.action.support.master;
 
+import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.client.internal.ElasticsearchClient;
 import org.elasticsearch.core.TimeValue;
@@ -16,7 +17,7 @@ import org.elasticsearch.core.TimeValue;
  */
 public abstract class AcknowledgedRequestBuilder<
     Request extends AcknowledgedRequest<Request>,
-    Response extends AcknowledgedResponse,
+    Response extends ActionResponse & IsAcknowledgedSupplier,
     RequestBuilder extends AcknowledgedRequestBuilder<Request, Response, RequestBuilder>> extends MasterNodeOperationRequestBuilder<
         Request,
         Response,

+ 5 - 3
server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedResponse.java

@@ -25,13 +25,14 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg
 /**
  * A response that indicates that a request has been acknowledged
  */
-public class AcknowledgedResponse extends ActionResponse implements ToXContentObject {
+public class AcknowledgedResponse extends ActionResponse implements IsAcknowledgedSupplier, ToXContentObject {
 
     public static final AcknowledgedResponse TRUE = new AcknowledgedResponse(true);
 
     public static final AcknowledgedResponse FALSE = new AcknowledgedResponse(false);
 
-    private static final ParseField ACKNOWLEDGED = new ParseField("acknowledged");
+    public static final String ACKNOWLEDGED_KEY = "acknowledged";
+    private static final ParseField ACKNOWLEDGED = new ParseField(ACKNOWLEDGED_KEY);
 
     protected static <T extends AcknowledgedResponse> void declareAcknowledgedField(ConstructingObjectParser<T, Void> objectParser) {
         objectParser.declareField(
@@ -65,6 +66,7 @@ public class AcknowledgedResponse extends ActionResponse implements ToXContentOb
      * Returns whether the response is acknowledged or not
      * @return true if the response is acknowledged, false otherwise
      */
+    @Override
     public final boolean isAcknowledged() {
         return acknowledged;
     }
@@ -77,7 +79,7 @@ public class AcknowledgedResponse extends ActionResponse implements ToXContentOb
     @Override
     public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
-        builder.field(ACKNOWLEDGED.getPreferredName(), isAcknowledged());
+        builder.field(ACKNOWLEDGED_KEY, isAcknowledged());
         addCustomFields(builder, params);
         builder.endObject();
         return builder;

+ 13 - 0
server/src/main/java/org/elasticsearch/action/support/master/IsAcknowledgedSupplier.java

@@ -0,0 +1,13 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.support.master;
+
+public interface IsAcknowledgedSupplier {
+    boolean isAcknowledged();
+}

+ 32 - 6
server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContent.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.common.xcontent;
 
+import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -16,21 +17,46 @@ import java.io.IOException;
 import java.util.Iterator;
 
 /**
- * An extension of {@link ToXContent} that can be serialized in chunks by creating an {@link Iterator<ToXContent>}.
- * This is used by the REST layer to implement flow control that does not rely on blocking the serializing thread when writing the
- * serialized bytes to a non-blocking channel.
+ * An alternative to {@link ToXContent} allowing for progressive serialization by creating an {@link Iterator} of {@link ToXContent} chunks.
+ * <p>
+ * The REST layer only serializes enough chunks at once to keep an outbound buffer full, rather than consuming all the time and memory
+ * needed to serialize the entire response as must be done with the regular {@link ToXContent} responses.
  */
 public interface ChunkedToXContent {
 
     /**
-     * Create an iterator of {@link ToXContent} chunks, that must be serialized individually with the same {@link XContentBuilder} and
-     * {@link ToXContent.Params} for each call until it is fully drained.
+     * Create an iterator of {@link ToXContent} chunks for a REST response. Each chunk is serialized with the same {@link XContentBuilder}
+     * and {@link ToXContent.Params}, which is also the same as the {@link ToXContent.Params} passed as the {@code params} argument. For
+     * best results, all chunks should be {@code O(1)} size. See also {@link ChunkedToXContentHelper} for some handy utilities.
+     * <p>
+     * Note that chunked response bodies cannot send deprecation warning headers once transmission has started, so implementations must
+     * check for deprecated feature use before returning.
+     *
      * @return iterator over chunks of {@link ToXContent}
      */
     Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params);
 
+    /**
+     * Create an iterator of {@link ToXContent} chunks for a response to the {@link RestApiVersion#V_7} API. Each chunk is serialized with
+     * the same {@link XContentBuilder} and {@link ToXContent.Params}, which is also the same as the {@link ToXContent.Params} passed as the
+     * {@code params} argument. For best results, all chunks should be {@code O(1)} size. See also {@link ChunkedToXContentHelper} for some
+     * handy utilities.
+     * <p>
+     * Similar to {@link #toXContentChunked} but for the {@link RestApiVersion#V_7} API. By default this method delegates to {@link
+     * #toXContentChunked}.
+     * <p>
+     * Note that chunked response bodies cannot send deprecation warning headers once transmission has started, so implementations must
+     * check for deprecated feature use before returning.
+     *
+     * @return iterator over chunks of {@link ToXContent}
+     */
+    default Iterator<? extends ToXContent> toXContentChunkedV7(ToXContent.Params params) {
+        return toXContentChunked(params);
+    }
+
     /**
      * Wraps the given instance in a {@link ToXContent} that will fully serialize the instance when serialized.
+     *
      * @param chunkedToXContent instance to wrap
      * @return x-content instance
      */
@@ -53,7 +79,7 @@ public interface ChunkedToXContent {
     }
 
     /**
-     * @return true if this instances serializes as an x-content fragment. See {@link ToXContentObject} for additional details.
+     * @return true iff this instance serializes as a fragment. See {@link ToXContentObject} for additional details.
      */
     default boolean isFragment() {
         return true;

+ 4 - 1
server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java

@@ -15,6 +15,7 @@ import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.common.xcontent.ChunkedToXContent;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Releasables;
+import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.core.Streams;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -82,7 +83,9 @@ public interface ChunkedRestResponseBody {
                 Streams.noCloseStream(out)
             );
 
-            private final Iterator<? extends ToXContent> serialization = chunkedToXContent.toXContentChunked(params);
+            private final Iterator<? extends ToXContent> serialization = builder.getRestApiVersion() == RestApiVersion.V_7
+                ? chunkedToXContent.toXContentChunkedV7(params)
+                : chunkedToXContent.toXContentChunked(params);
 
             private BytesStream target;
 

+ 2 - 2
server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterRerouteAction.java

@@ -18,7 +18,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
-import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.rest.action.RestChunkedToXContentListener;
 import org.elasticsearch.xcontent.ObjectParser;
 import org.elasticsearch.xcontent.ObjectParser.ValueType;
 import org.elasticsearch.xcontent.ParseField;
@@ -82,7 +82,7 @@ public class RestClusterRerouteAction extends BaseRestHandler {
         if (metric == null) {
             request.params().put("metric", DEFAULT_METRICS);
         }
-        return channel -> client.admin().cluster().reroute(clusterRerouteRequest, new RestToXContentListener<>(channel));
+        return channel -> client.admin().cluster().reroute(clusterRerouteRequest, new RestChunkedToXContentListener<>(channel));
     }
 
     @Override

+ 125 - 99
server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java

@@ -22,18 +22,20 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.xcontent.ChunkedToXContent;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xcontent.ToXContent;
-import org.elasticsearch.xcontent.json.JsonXContent;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import static org.elasticsearch.common.util.CollectionUtils.appendToCopy;
-import static org.hamcrest.Matchers.equalTo;
+import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
 
 public class ClusterRerouteResponseTests extends ESTestCase {
 
@@ -43,54 +45,45 @@ public class ClusterRerouteResponseTests extends ESTestCase {
     }
 
     public void testToXContent() throws IOException {
-        var clusterState = createClusterState();
-        var clusterRerouteResponse = createClusterRerouteResponse(clusterState);
-
-        var result = toXContent(clusterRerouteResponse, new ToXContent.MapParams(Map.of("metric", "none")));
-
-        assertThat(result, equalTo(XContentHelper.stripWhitespace("""
+        assertXContent(createClusterRerouteResponse(createClusterState()), new ToXContent.MapParams(Map.of("metric", "none")), 2, """
             {
               "acknowledged": true
-            }""")));
+            }""");
     }
 
-    public void testToXContentWithExplain() throws IOException {
+    public void testToXContentWithExplain() {
         var clusterState = createClusterState();
-        var clusterRerouteResponse = createClusterRerouteResponse(clusterState);
-
-        var result = toXContent(clusterRerouteResponse, new ToXContent.MapParams(Map.of("explain", "true", "metric", "none")));
-
-        assertThat(result, equalTo(XContentHelper.stripWhitespace(Strings.format("""
-            {
-              "acknowledged": true,
-              "explanations": [
+        assertXContent(
+            createClusterRerouteResponse(clusterState),
+            new ToXContent.MapParams(Map.of("explain", "true", "metric", "none")),
+            2,
+            Strings.format("""
                 {
-                  "command": "allocate_replica",
-                  "parameters": {
-                    "index": "index",
-                    "shard": 0,
-                    "node": "node0"
-                  },
-                  "decisions": [
+                  "acknowledged": true,
+                  "explanations": [
                     {
-                      "decider": null,
-                      "decision": "YES",
-                      "explanation": "none"
+                      "command": "allocate_replica",
+                      "parameters": {
+                        "index": "index",
+                        "shard": 0,
+                        "node": "node0"
+                      },
+                      "decisions": [
+                        {
+                          "decider": null,
+                          "decision": "YES",
+                          "explanation": "none"
+                        }
+                      ]
                     }
                   ]
-                }
-              ]
-            }""", clusterState.stateUUID()))));
-
+                }""", clusterState.stateUUID())
+        );
     }
 
-    public void testToXContentWithDeprecatedClusterState() throws IOException {
+    public void testToXContentWithDeprecatedClusterState() {
         var clusterState = createClusterState();
-        var clusterRerouteResponse = createClusterRerouteResponse(clusterState);
-
-        var result = toXContent(clusterRerouteResponse, ToXContent.EMPTY_PARAMS);
-
-        assertThat(result, equalTo(XContentHelper.stripWhitespace(Strings.format("""
+        assertXContent(createClusterRerouteResponse(clusterState), ToXContent.EMPTY_PARAMS, 32, Strings.format("""
             {
               "acknowledged": true,
               "state": {
@@ -185,77 +178,110 @@ public class ClusterRerouteResponseTests extends ESTestCase {
                   }
                 }
               }
-            }""", clusterState.stateUUID(), clusterState.getNodes().get("node0").getEphemeralId(), Version.CURRENT.id))));
+            }""", clusterState.stateUUID(), clusterState.getNodes().get("node0").getEphemeralId(), Version.CURRENT.id), """
+            The [state] field in the response to the reroute API is deprecated and will be removed in a future version. \
+            Specify ?metric=none to adopt the future behaviour.""");
     }
 
-    public void testToXContentWithDeprecatedClusterStateAndMetadata() throws IOException {
-        var clusterState = createClusterState();
-        var clusterRerouteResponse = createClusterRerouteResponse(clusterState);
-
-        var result = toXContent(
-            clusterRerouteResponse,
-            new ToXContent.MapParams(Map.of("metric", "metadata", "settings_filter", "index.number*,index.version.created"))
-        );
-
-        assertThat(result, equalTo(XContentHelper.stripWhitespace("""
-            {
-              "acknowledged" : true,
-              "state" : {
-                "cluster_uuid" : "_na_",
-                "metadata" : {
-                  "cluster_uuid" : "_na_",
-                  "cluster_uuid_committed" : false,
-                  "cluster_coordination" : {
-                    "term" : 0,
-                    "last_committed_config" : [ ],
-                    "last_accepted_config" : [ ],
-                    "voting_config_exclusions" : [ ]
-                  },
-                  "templates" : { },
-                  "indices" : {
-                    "index" : {
-                      "version" : 1,
-                      "mapping_version" : 1,
-                      "settings_version" : 1,
-                      "aliases_version" : 1,
-                      "routing_num_shards" : 1,
-                      "state" : "open",
-                      "settings" : {
+    public void testToXContentWithDeprecatedClusterStateAndMetadata() {
+        assertXContent(
+            createClusterRerouteResponse(createClusterState()),
+            new ToXContent.MapParams(Map.of("metric", "metadata", "settings_filter", "index.number*,index.version.created")),
+            19,
+            """
+                {
+                  "acknowledged" : true,
+                  "state" : {
+                    "cluster_uuid" : "_na_",
+                    "metadata" : {
+                      "cluster_uuid" : "_na_",
+                      "cluster_uuid_committed" : false,
+                      "cluster_coordination" : {
+                        "term" : 0,
+                        "last_committed_config" : [ ],
+                        "last_accepted_config" : [ ],
+                        "voting_config_exclusions" : [ ]
+                      },
+                      "templates" : { },
+                      "indices" : {
                         "index" : {
-                          "max_script_fields" : "10",
-                          "shard" : {
-                            "check_on_startup" : "true"
+                          "version" : 1,
+                          "mapping_version" : 1,
+                          "settings_version" : 1,
+                          "aliases_version" : 1,
+                          "routing_num_shards" : 1,
+                          "state" : "open",
+                          "settings" : {
+                            "index" : {
+                              "max_script_fields" : "10",
+                              "shard" : {
+                                "check_on_startup" : "true"
+                              }
+                            }
+                          },
+                          "mappings" : { },
+                          "aliases" : [ ],
+                          "primary_terms" : {
+                            "0" : 0
+                          },
+                          "in_sync_allocations" : {
+                            "0" : [ ]
+                          },
+                          "rollover_info" : { },
+                          "system" : false,
+                          "timestamp_range" : {
+                            "shards" : [ ]
                           }
                         }
                       },
-                      "mappings" : { },
-                      "aliases" : [ ],
-                      "primary_terms" : {
-                        "0" : 0
-                      },
-                      "in_sync_allocations" : {
-                        "0" : [ ]
+                      "index-graveyard" : {
+                        "tombstones" : [ ]
                       },
-                      "rollover_info" : { },
-                      "system" : false,
-                      "timestamp_range" : {
-                        "shards" : [ ]
-                      }
+                      "reserved_state":{}
                     }
-                  },
-                  "index-graveyard" : {
-                    "tombstones" : [ ]
-                  },
-                  "reserved_state":{}
-                }
-              }
-            }""")));
+                  }
+                }""",
+            """
+                The [state] field in the response to the reroute API is deprecated and will be removed in a future version. \
+                Specify ?metric=none to adopt the future behaviour."""
+        );
     }
 
-    private static String toXContent(ClusterRerouteResponse clusterRerouteResponse, ToXContent.Params params) throws IOException {
-        var builder = JsonXContent.contentBuilder().prettyPrint();
-        clusterRerouteResponse.toXContent(builder, params);
-        return XContentHelper.stripWhitespace(Strings.toString(builder));
+    private void assertXContent(
+        ClusterRerouteResponse response,
+        ToXContent.Params params,
+        int expectedChunks,
+        String expectedBody,
+        String... criticalDeprecationWarnings
+    ) {
+        try {
+            var builder = jsonBuilder();
+            if (randomBoolean()) {
+                builder.prettyPrint();
+            }
+            ChunkedToXContent.wrapAsToXContent(response).toXContent(builder, params);
+            assertEquals(XContentHelper.stripWhitespace(expectedBody), XContentHelper.stripWhitespace(Strings.toString(builder)));
+        } catch (IOException e) {
+            throw new AssertionError("unexpected", e);
+        }
+
+        AbstractChunkedSerializingTestCase.assertChunkCount(response, params, ignored -> expectedChunks);
+        assertCriticalWarnings(criticalDeprecationWarnings);
+
+        // check the v7 API too
+        AbstractChunkedSerializingTestCase.assertChunkCount(new ChunkedToXContent() {
+            @Override
+            public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
+                return response.toXContentChunkedV7(outerParams);
+            }
+
+            @Override
+            public boolean isFragment() {
+                return response.isFragment();
+            }
+        }, params, ignored -> expectedChunks);
+        // the v7 API should not emit any deprecation warnings
+        assertCriticalWarnings();
     }
 
     private static ClusterRerouteResponse createClusterRerouteResponse(ClusterState clusterState) {

+ 2 - 2
test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java

@@ -27,7 +27,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
 import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse;
 import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.IsAcknowledgedSupplier;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -99,7 +99,7 @@ public class ElasticsearchAssertions {
         assertThat("ClusterHealthResponse has timed out - returned: [" + response + "]", response.isTimedOut(), is(false));
     }
 
-    public static void assertAcked(AcknowledgedResponse response) {
+    public static void assertAcked(IsAcknowledgedSupplier response) {
         assertThat(response.getClass().getSimpleName() + " failed - not acked", response.isAcknowledged(), equalTo(true));
     }