Browse Source

[HLRC] Added support for CCR Put Follow API (#35409)

This change also adds documentation for the Put Follow API and
adds a CCR HLRC integration test.
Martijn van Groningen 7 years ago
parent
commit
e81671dd45

+ 47 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java

@@ -21,6 +21,8 @@ package org.elasticsearch.client;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.ccr.PauseFollowRequest;
+import org.elasticsearch.client.ccr.PutFollowRequest;
+import org.elasticsearch.client.ccr.PutFollowResponse;
 import org.elasticsearch.client.core.AcknowledgedResponse;
 
 import java.io.IOException;
@@ -41,6 +43,51 @@ public final class CcrClient {
         this.restHighLevelClient = restHighLevelClient;
     }
 
+    /**
+     * Executes the put follow api, which creates a follower index and then the follower index starts following
+     * the leader index.
+     *
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-put-follow.html">
+     * the docs</a> for more.
+     *
+     * @param request the request
+     * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return the response
+     * @throws IOException in case there is a problem sending the request or parsing back the response
+     */
+    public PutFollowResponse putFollow(PutFollowRequest request, RequestOptions options) throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(
+            request,
+            CcrRequestConverters::putFollow,
+            options,
+            PutFollowResponse::fromXContent,
+            Collections.emptySet()
+        );
+    }
+
+    /**
+     * Asynchronously executes the put follow api, which creates a follower index and then the follower index starts
+     * following the leader index.
+     *
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-put-follow.html">
+     * the docs</a> for more.
+     *
+     * @param request the request
+     * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     */
+    public void putFollowAsync(PutFollowRequest request,
+                               RequestOptions options,
+                               ActionListener<PutFollowResponse> listener) {
+        restHighLevelClient.performRequestAsyncAndParseEntity(
+            request,
+            CcrRequestConverters::putFollow,
+            options,
+            PutFollowResponse::fromXContent,
+            listener,
+            Collections.emptySet()
+        );
+    }
+
     /**
      * Instructs a follower index the pause the following of a leader index.
      *

+ 17 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java

@@ -20,10 +20,27 @@
 package org.elasticsearch.client;
 
 import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
 import org.elasticsearch.client.ccr.PauseFollowRequest;
+import org.elasticsearch.client.ccr.PutFollowRequest;
+
+import java.io.IOException;
+
+import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;
+import static org.elasticsearch.client.RequestConverters.createEntity;
 
 final class CcrRequestConverters {
 
+    static Request putFollow(PutFollowRequest putFollowRequest) throws IOException {
+        String endpoint = new RequestConverters.EndpointBuilder()
+            .addPathPart(putFollowRequest.getFollowerIndex())
+            .addPathPartAsIs("_ccr", "follow")
+            .build();
+        Request request = new Request(HttpPut.METHOD_NAME, endpoint);
+        request.setEntity(createEntity(putFollowRequest, REQUEST_BODY_CONTENT_TYPE));
+        return request;
+    }
+
     static Request pauseFollow(PauseFollowRequest pauseFollowRequest) {
         String endpoint = new RequestConverters.EndpointBuilder()
             .addPathPart(pauseFollowRequest.getFollowerIndex())

+ 238 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java

@@ -0,0 +1,238 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.ccr;
+
+import org.elasticsearch.client.Validatable;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public final class PutFollowRequest implements Validatable, ToXContentObject {
+
+    static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
+    static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
+    static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
+    static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count");
+    static final ParseField MAX_READ_REQUEST_SIZE = new ParseField("max_read_request_size");
+    static final ParseField MAX_OUTSTANDING_READ_REQUESTS = new ParseField("max_outstanding_read_requests");
+    static final ParseField MAX_WRITE_REQUEST_OPERATION_COUNT = new ParseField("max_write_request_operation_count");
+    static final ParseField MAX_WRITE_REQUEST_SIZE = new ParseField("max_write_request_size");
+    static final ParseField MAX_OUTSTANDING_WRITE_REQUESTS = new ParseField("max_outstanding_write_requests");
+    static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count");
+    static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
+    static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay");
+    static final ParseField READ_POLL_TIMEOUT = new ParseField("read_poll_timeout");
+
+    private final String remoteCluster;
+    private final String leaderIndex;
+    private final String followerIndex;
+    private Integer maxReadRequestOperationCount;
+    private Integer maxOutstandingReadRequests;
+    private ByteSizeValue maxReadRequestSize;
+    private Integer maxWriteRequestOperationCount;
+    private ByteSizeValue maxWriteRequestSize;
+    private Integer maxOutstandingWriteRequests;
+    private Integer maxWriteBufferCount;
+    private ByteSizeValue maxWriteBufferSize;
+    private TimeValue maxRetryDelay;
+    private TimeValue readPollTimeout;
+
+    public PutFollowRequest(String remoteCluster, String leaderIndex, String followerIndex) {
+        this.remoteCluster = Objects.requireNonNull(remoteCluster, "remoteCluster");
+        this.leaderIndex = Objects.requireNonNull(leaderIndex, "leaderIndex");
+        this.followerIndex = Objects.requireNonNull(followerIndex, "followerIndex");
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
+        builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
+        builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
+        if (maxReadRequestOperationCount != null) {
+            builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), maxReadRequestOperationCount);
+        }
+        if (maxReadRequestSize != null) {
+            builder.field(MAX_READ_REQUEST_SIZE.getPreferredName(), maxReadRequestSize.getStringRep());
+        }
+        if (maxWriteRequestOperationCount != null) {
+            builder.field(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), maxWriteRequestOperationCount);
+        }
+        if (maxWriteRequestSize != null) {
+            builder.field(MAX_WRITE_REQUEST_SIZE.getPreferredName(), maxWriteRequestSize.getStringRep());
+        }
+        if (maxWriteBufferCount != null) {
+            builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount);
+        }
+        if (maxWriteBufferSize != null) {
+            builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep());
+        }
+        if (maxOutstandingReadRequests != null) {
+            builder.field(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), maxOutstandingReadRequests);
+        }
+        if (maxOutstandingWriteRequests != null) {
+            builder.field(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), maxOutstandingWriteRequests);
+        }
+        if (maxRetryDelay != null) {
+            builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep());
+        }
+        if (readPollTimeout != null) {
+            builder.field(READ_POLL_TIMEOUT.getPreferredName(), readPollTimeout.getStringRep());
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    public String getRemoteCluster() {
+        return remoteCluster;
+    }
+
+    public String getLeaderIndex() {
+        return leaderIndex;
+    }
+
+    public String getFollowerIndex() {
+        return followerIndex;
+    }
+
+    public Integer getMaxReadRequestOperationCount() {
+        return maxReadRequestOperationCount;
+    }
+
+    public void setMaxReadRequestOperationCount(Integer maxReadRequestOperationCount) {
+        this.maxReadRequestOperationCount = maxReadRequestOperationCount;
+    }
+
+    public Integer getMaxOutstandingReadRequests() {
+        return maxOutstandingReadRequests;
+    }
+
+    public void setMaxOutstandingReadRequests(Integer maxOutstandingReadRequests) {
+        this.maxOutstandingReadRequests = maxOutstandingReadRequests;
+    }
+
+    public ByteSizeValue getMaxReadRequestSize() {
+        return maxReadRequestSize;
+    }
+
+    public void setMaxReadRequestSize(ByteSizeValue maxReadRequestSize) {
+        this.maxReadRequestSize = maxReadRequestSize;
+    }
+
+    public Integer getMaxWriteRequestOperationCount() {
+        return maxWriteRequestOperationCount;
+    }
+
+    public void setMaxWriteRequestOperationCount(Integer maxWriteRequestOperationCount) {
+        this.maxWriteRequestOperationCount = maxWriteRequestOperationCount;
+    }
+
+    public ByteSizeValue getMaxWriteRequestSize() {
+        return maxWriteRequestSize;
+    }
+
+    public void setMaxWriteRequestSize(ByteSizeValue maxWriteRequestSize) {
+        this.maxWriteRequestSize = maxWriteRequestSize;
+    }
+
+    public Integer getMaxOutstandingWriteRequests() {
+        return maxOutstandingWriteRequests;
+    }
+
+    public void setMaxOutstandingWriteRequests(Integer maxOutstandingWriteRequests) {
+        this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
+    }
+
+    public Integer getMaxWriteBufferCount() {
+        return maxWriteBufferCount;
+    }
+
+    public void setMaxWriteBufferCount(Integer maxWriteBufferCount) {
+        this.maxWriteBufferCount = maxWriteBufferCount;
+    }
+
+    public ByteSizeValue getMaxWriteBufferSize() {
+        return maxWriteBufferSize;
+    }
+
+    public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) {
+        this.maxWriteBufferSize = maxWriteBufferSize;
+    }
+
+    public TimeValue getMaxRetryDelay() {
+        return maxRetryDelay;
+    }
+
+    public void setMaxRetryDelay(TimeValue maxRetryDelay) {
+        this.maxRetryDelay = maxRetryDelay;
+    }
+
+    public TimeValue getReadPollTimeout() {
+        return readPollTimeout;
+    }
+
+    public void setReadPollTimeout(TimeValue readPollTimeout) {
+        this.readPollTimeout = readPollTimeout;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        PutFollowRequest that = (PutFollowRequest) o;
+        return Objects.equals(remoteCluster, that.remoteCluster) &&
+            Objects.equals(leaderIndex, that.leaderIndex) &&
+            Objects.equals(followerIndex, that.followerIndex) &&
+            Objects.equals(maxReadRequestOperationCount, that.maxReadRequestOperationCount) &&
+            Objects.equals(maxOutstandingReadRequests, that.maxOutstandingReadRequests) &&
+            Objects.equals(maxReadRequestSize, that.maxReadRequestSize) &&
+            Objects.equals(maxWriteRequestOperationCount, that.maxWriteRequestOperationCount) &&
+            Objects.equals(maxWriteRequestSize, that.maxWriteRequestSize) &&
+            Objects.equals(maxOutstandingWriteRequests, that.maxOutstandingWriteRequests) &&
+            Objects.equals(maxWriteBufferCount, that.maxWriteBufferCount) &&
+            Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) &&
+            Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
+            Objects.equals(readPollTimeout, that.readPollTimeout);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            remoteCluster,
+            leaderIndex,
+            followerIndex,
+            maxReadRequestOperationCount,
+            maxOutstandingReadRequests,
+            maxReadRequestSize,
+            maxWriteRequestOperationCount,
+            maxWriteRequestSize,
+            maxOutstandingWriteRequests,
+            maxWriteBufferCount,
+            maxWriteBufferSize,
+            maxRetryDelay,
+            readPollTimeout
+        );
+    }
+}

+ 84 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowResponse.java

@@ -0,0 +1,84 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.ccr;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public final class PutFollowResponse {
+
+    static final ParseField FOLLOW_INDEX_CREATED = new ParseField("follow_index_created");
+    static final ParseField FOLLOW_INDEX_SHARDS_ACKED = new ParseField("follow_index_shards_acked");
+    static final ParseField INDEX_FOLLOWING_STARTED = new ParseField("index_following_started");
+
+    private static final ConstructingObjectParser<PutFollowResponse, Void> PARSER = new ConstructingObjectParser<>(
+        "put_follow_response", args -> new PutFollowResponse((boolean) args[0], (boolean) args[1], (boolean) args[2]));
+
+    static {
+        PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOLLOW_INDEX_CREATED);
+        PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOLLOW_INDEX_SHARDS_ACKED);
+        PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), INDEX_FOLLOWING_STARTED);
+    }
+
+    public static PutFollowResponse fromXContent(final XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    private final boolean followIndexCreated;
+    private final boolean followIndexShardsAcked;
+    private final boolean indexFollowingStarted;
+
+    public PutFollowResponse(boolean followIndexCreated, boolean followIndexShardsAcked, boolean indexFollowingStarted) {
+        this.followIndexCreated = followIndexCreated;
+        this.followIndexShardsAcked = followIndexShardsAcked;
+        this.indexFollowingStarted = indexFollowingStarted;
+    }
+
+    public boolean isFollowIndexCreated() {
+        return followIndexCreated;
+    }
+
+    public boolean isFollowIndexShardsAcked() {
+        return followIndexShardsAcked;
+    }
+
+    public boolean isIndexFollowingStarted() {
+        return indexFollowingStarted;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        PutFollowResponse that = (PutFollowResponse) o;
+        return followIndexCreated == that.followIndexCreated &&
+            followIndexShardsAcked == that.followIndexShardsAcked &&
+            indexFollowingStarted == that.indexFollowingStarted;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(followIndexCreated, followIndexShardsAcked, indexFollowingStarted);
+    }
+}

+ 104 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java

@@ -0,0 +1,104 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client;
+
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
+import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.ccr.PauseFollowRequest;
+import org.elasticsearch.client.ccr.PutFollowRequest;
+import org.elasticsearch.client.ccr.PutFollowResponse;
+import org.elasticsearch.client.core.AcknowledgedResponse;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class CCRIT extends ESRestHighLevelClientTestCase {
+
+    @Before
+    public void setupRemoteClusterConfig() throws IOException {
+        // Configure local cluster as remote cluster:
+        // TODO: replace with nodes info highlevel rest client code when it is available:
+        final Request request = new Request("GET", "/_nodes");
+        Map<?, ?> nodesResponse = (Map<?, ?>) toMap(client().performRequest(request)).get("nodes");
+        // Select node info of first node (we don't know the node id):
+        nodesResponse = (Map<?, ?>) nodesResponse.get(nodesResponse.keySet().iterator().next());
+        String transportAddress = (String) nodesResponse.get("transport_address");
+
+        ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
+        updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local.seeds", transportAddress));
+        ClusterUpdateSettingsResponse updateSettingsResponse =
+            highLevelClient().cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
+        assertThat(updateSettingsResponse.isAcknowledged(), is(true));
+    }
+
+    public void testCCR() throws Exception {
+        CcrClient ccrClient = highLevelClient().ccr();
+
+        CreateIndexRequest createIndexRequest = new CreateIndexRequest("leader");
+        createIndexRequest.settings(Collections.singletonMap("index.soft_deletes.enabled", true));
+        CreateIndexResponse response = highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
+        assertThat(response.isAcknowledged(), is(true));
+
+        PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower");
+        PutFollowResponse putFollowResponse = execute(putFollowRequest, ccrClient::putFollow, ccrClient::putFollowAsync);
+        assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
+        assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
+        assertThat(putFollowResponse.isIndexFollowingStarted(), is(true));
+
+        IndexRequest indexRequest = new IndexRequest("leader", "_doc")
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .source("{}", XContentType.JSON);
+        highLevelClient().index(indexRequest, RequestOptions.DEFAULT);
+
+        SearchRequest leaderSearchRequest = new SearchRequest("leader");
+        SearchResponse leaderSearchResponse = highLevelClient().search(leaderSearchRequest, RequestOptions.DEFAULT);
+        assertThat(leaderSearchResponse.getHits().getTotalHits(), equalTo(1L));
+
+        assertBusy(() -> {
+            SearchRequest followerSearchRequest = new SearchRequest("follower");
+            SearchResponse followerSearchResponse = highLevelClient().search(followerSearchRequest, RequestOptions.DEFAULT);
+            assertThat(followerSearchResponse.getHits().getTotalHits(), equalTo(1L));
+        });
+
+        PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower");
+        AcknowledgedResponse pauseFollowResponse = execute(pauseFollowRequest, ccrClient::pauseFollow, ccrClient::pauseFollowAsync);
+        assertThat(pauseFollowResponse.isAcknowledged(), is(true));
+    }
+
+    private static Map<String, Object> toMap(Response response) throws IOException {
+        return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
+    }
+
+}

+ 119 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/PutFollowRequestTests.java

@@ -0,0 +1,119 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.ccr;
+
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+
+public class PutFollowRequestTests extends AbstractXContentTestCase<PutFollowRequest> {
+
+    private static final ConstructingObjectParser<PutFollowRequest, Void> PARSER = new ConstructingObjectParser<>("test_parser",
+        (args) -> new PutFollowRequest((String) args[0], (String) args[1], (String) args[2]));
+
+    static {
+        PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.REMOTE_CLUSTER_FIELD);
+        PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.LEADER_INDEX_FIELD);
+        PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD);
+        PARSER.declareInt(PutFollowRequest::setMaxReadRequestOperationCount, PutFollowRequest.MAX_READ_REQUEST_OPERATION_COUNT);
+        PARSER.declareField(
+            PutFollowRequest::setMaxReadRequestSize,
+            (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), PutFollowRequest.MAX_READ_REQUEST_SIZE.getPreferredName()),
+            PutFollowRequest.MAX_READ_REQUEST_SIZE,
+            ObjectParser.ValueType.STRING);
+        PARSER.declareInt(PutFollowRequest::setMaxOutstandingReadRequests, PutFollowRequest.MAX_OUTSTANDING_READ_REQUESTS);
+        PARSER.declareInt(PutFollowRequest::setMaxWriteRequestOperationCount, PutFollowRequest.MAX_WRITE_REQUEST_OPERATION_COUNT);
+        PARSER.declareField(
+            PutFollowRequest::setMaxWriteRequestSize,
+            (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), PutFollowRequest.MAX_WRITE_REQUEST_SIZE.getPreferredName()),
+            PutFollowRequest.MAX_WRITE_REQUEST_SIZE,
+            ObjectParser.ValueType.STRING);
+        PARSER.declareInt(PutFollowRequest::setMaxOutstandingWriteRequests, PutFollowRequest.MAX_OUTSTANDING_WRITE_REQUESTS);
+        PARSER.declareInt(PutFollowRequest::setMaxWriteBufferCount, PutFollowRequest.MAX_WRITE_BUFFER_COUNT);
+        PARSER.declareField(
+            PutFollowRequest::setMaxWriteBufferSize,
+            (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), PutFollowRequest.MAX_WRITE_BUFFER_SIZE.getPreferredName()),
+            PutFollowRequest.MAX_WRITE_BUFFER_SIZE,
+            ObjectParser.ValueType.STRING);
+        PARSER.declareField(
+            PutFollowRequest::setMaxRetryDelay,
+            (p, c) -> TimeValue.parseTimeValue(p.text(), PutFollowRequest.MAX_RETRY_DELAY_FIELD.getPreferredName()),
+            PutFollowRequest.MAX_RETRY_DELAY_FIELD,
+            ObjectParser.ValueType.STRING);
+        PARSER.declareField(
+            PutFollowRequest::setReadPollTimeout,
+            (p, c) -> TimeValue.parseTimeValue(p.text(), PutFollowRequest.READ_POLL_TIMEOUT.getPreferredName()),
+            PutFollowRequest.READ_POLL_TIMEOUT,
+            ObjectParser.ValueType.STRING);
+    }
+
+    @Override
+    protected PutFollowRequest doParseInstance(XContentParser parser) throws IOException {
+        return PARSER.apply(parser, null);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return false;
+    }
+
+    @Override
+    protected PutFollowRequest createTestInstance() {
+        PutFollowRequest putFollowRequest =
+            new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), randomAlphaOfLength(4));
+        if (randomBoolean()) {
+            putFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
+        }
+        if (randomBoolean()) {
+            putFollowRequest.setMaxOutstandingWriteRequests(randomIntBetween(0, Integer.MAX_VALUE));
+        }
+        if (randomBoolean()) {
+            putFollowRequest.setMaxReadRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
+        }
+        if (randomBoolean()) {
+            putFollowRequest.setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong()));
+        }
+        if (randomBoolean()) {
+            putFollowRequest.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE));
+        }
+        if (randomBoolean()) {
+            putFollowRequest.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong()));
+        }
+        if (randomBoolean()) {
+            putFollowRequest.setMaxWriteRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
+        }
+        if (randomBoolean()) {
+            putFollowRequest.setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong()));
+        }
+        if (randomBoolean()) {
+            putFollowRequest.setMaxRetryDelay(new TimeValue(randomNonNegativeLong()));
+        }
+        if (randomBoolean()) {
+            putFollowRequest.setReadPollTimeout(new TimeValue(randomNonNegativeLong()));
+        }
+        return putFollowRequest;
+    }
+
+}

+ 53 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/PutFollowResponseTests.java

@@ -0,0 +1,53 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.ccr;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
+
+public class PutFollowResponseTests extends ESTestCase {
+
+    public void testFromXContent() throws IOException {
+        xContentTester(this::createParser,
+            this::createTestInstance,
+            PutFollowResponseTests::toXContent,
+            PutFollowResponse::fromXContent)
+            .supportsUnknownFields(false)
+            .test();
+    }
+
+    private PutFollowResponse createTestInstance() {
+        return new PutFollowResponse(randomBoolean(), randomBoolean(), randomBoolean());
+    }
+
+    public static void toXContent(PutFollowResponse response, XContentBuilder builder) throws IOException {
+        builder.startObject();
+        {
+            builder.field(PutFollowResponse.FOLLOW_INDEX_CREATED.getPreferredName(), response.isFollowIndexCreated());
+            builder.field(PutFollowResponse.FOLLOW_INDEX_SHARDS_ACKED.getPreferredName(), response.isFollowIndexShardsAcked());
+            builder.field(PutFollowResponse.INDEX_FOLLOWING_STARTED.getPreferredName(), response.isIndexFollowingStarted());
+        }
+        builder.endObject();
+    }
+}

+ 105 - 20
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java

@@ -26,15 +26,19 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequ
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.client.ESRestHighLevelClientTestCase;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.ccr.PauseFollowRequest;
+import org.elasticsearch.client.ccr.PutFollowRequest;
+import org.elasticsearch.client.ccr.PutFollowResponse;
 import org.elasticsearch.client.core.AcknowledgedResponse;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.junit.Before;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -47,25 +51,106 @@ import static org.hamcrest.Matchers.is;
 
 public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
 
-    public void testPauseFollow() throws Exception {
+    @Before
+    public void setupRemoteClusterConfig() throws IOException {
+        RestHighLevelClient client = highLevelClient();
+        // Configure local cluster as remote cluster:
+        // TODO: replace with nodes info highlevel rest client code when it is available:
+        final Request request = new Request("GET", "/_nodes");
+        Map<?, ?> nodesResponse = (Map<?, ?>) toMap(client().performRequest(request)).get("nodes");
+        // Select node info of first node (we don't know the node id):
+        nodesResponse = (Map<?, ?>) nodesResponse.get(nodesResponse.keySet().iterator().next());
+        String transportAddress = (String) nodesResponse.get("transport_address");
+
+        ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
+        updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local.seeds", transportAddress));
+        ClusterUpdateSettingsResponse updateSettingsResponse =
+            client.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
+        assertThat(updateSettingsResponse.isAcknowledged(), is(true));
+    }
+
+    public void testPutFollow() throws Exception {
         RestHighLevelClient client = highLevelClient();
+        {
+            // Create leader index:
+            CreateIndexRequest createIndexRequest = new CreateIndexRequest("leader");
+            createIndexRequest.settings(Collections.singletonMap("index.soft_deletes.enabled", true));
+            CreateIndexResponse response = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
+            assertThat(response.isAcknowledged(), is(true));
+        }
 
+        // tag::ccr-put-follow-request
+        PutFollowRequest putFollowRequest = new PutFollowRequest(
+            "local", // <1>
+            "leader", // <2>
+            "follower" // <3>
+        );
+        // end::ccr-put-follow-request
+
+        // tag::ccr-put-follow-execute
+        PutFollowResponse putFollowResponse =
+            client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
+        // end::ccr-put-follow-execute
+
+        // tag::ccr-put-follow-response
+        boolean isFollowIndexCreated =
+            putFollowResponse.isFollowIndexCreated(); // <1>
+        boolean isFollowIndexShardsAcked =
+            putFollowResponse.isFollowIndexShardsAcked(); // <2>
+        boolean isIndexFollowingStarted =
+            putFollowResponse.isIndexFollowingStarted(); // <3>
+        // end::ccr-put-follow-response
+
+        // Pause following and delete follower index, so that we can execute put follow api again:
         {
-            // Configure local cluster as remote cluster:
-
-            // TODO: replace with nodes info highlevel rest client code when it is available:
-            final Request request = new Request("GET", "/_nodes");
-            Map<?, ?> nodesResponse = (Map<?, ?>) toMap(client().performRequest(request)).get("nodes");
-            // Select node info of first node (we don't know the node id):
-            nodesResponse = (Map<?, ?>) nodesResponse.get(nodesResponse.keySet().iterator().next());
-            String transportAddress = (String) nodesResponse.get("transport_address");
-
-            ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
-            updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local.seeds", transportAddress));
-            ClusterUpdateSettingsResponse updateSettingsResponse =
-                client.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
-            assertThat(updateSettingsResponse.isAcknowledged(), is(true));
+            PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower");
+            AcknowledgedResponse pauseFollowResponse =  client.ccr().pauseFollow(pauseFollowRequest, RequestOptions.DEFAULT);
+            assertThat(pauseFollowResponse.isAcknowledged(), is(true));
+
+            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest("follower");
+            assertThat(client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT).isAcknowledged(), is(true));
         }
+
+        // tag::ccr-put-follow-execute-listener
+        ActionListener<PutFollowResponse> listener =
+            new ActionListener<PutFollowResponse>() {
+                @Override
+                public void onResponse(PutFollowResponse response) { // <1>
+                    boolean isFollowIndexCreated =
+                        putFollowResponse.isFollowIndexCreated();
+                    boolean isFollowIndexShardsAcked =
+                        putFollowResponse.isFollowIndexShardsAcked();
+                    boolean isIndexFollowingStarted =
+                        putFollowResponse.isIndexFollowingStarted();
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    // <2>
+                }
+            };
+        // end::ccr-put-follow-execute-listener
+
+        // Replace the empty listener by a blocking listener in test
+        final CountDownLatch latch = new CountDownLatch(1);
+        listener = new LatchedActionListener<>(listener, latch);
+
+        // tag::ccr-put-follow-execute-async
+        client.ccr().putFollowAsync(putFollowRequest,
+            RequestOptions.DEFAULT, listener); // <1>
+        // end::ccr-put-follow-execute-async
+
+        assertTrue(latch.await(30L, TimeUnit.SECONDS));
+
+        {
+            PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower");
+            AcknowledgedResponse pauseFollowResponse =  client.ccr().pauseFollow(pauseFollowRequest, RequestOptions.DEFAULT);
+            assertThat(pauseFollowResponse.isAcknowledged(), is(true));
+        }
+    }
+
+    public void testPauseFollow() throws Exception {
+        RestHighLevelClient client = highLevelClient();
         {
             // Create leader index:
             CreateIndexRequest createIndexRequest = new CreateIndexRequest("leader");
@@ -76,11 +161,11 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
         String followIndex = "follower";
         // Follow index, so that it can be paused:
         {
-            // TODO: Replace this with high level rest client code when put follow API is available:
-            final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
-            request.setJsonEntity("{\"remote_cluster\": \"local\", \"leader_index\": \"leader\"}");
-            Response response = client().performRequest(request);
-            assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+            PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
+            PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
+            assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
+            assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
+            assertThat(putFollowResponse.isIndexFollowingStarted(), is(true));
         }
 
         // tag::ccr-pause-follow-request

+ 39 - 0
docs/java-rest/high-level/ccr/put_follow.asciidoc

@@ -0,0 +1,39 @@
+--
+:api: ccr-put-follow
+:request: PutFollowRequest
+:response: PutFollowResponse
+--
+
+[id="{upid}-{api}"]
+=== Put Follow API
+
+
+[id="{upid}-{api}-request"]
+==== Request
+
+The Put Follow API allows creates a follower index and make that index follow a leader index.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-request]
+--------------------------------------------------
+<1> The name of the remote cluster alias.
+<2> The name of the leader in the remote cluster.
+<3> The name of the follower index that gets created as part of the put follow API call.
+
+[id="{upid}-{api}-response"]
+==== Response
+
+The returned +{response}+ indicates if the put follow request was received.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-response]
+--------------------------------------------------
+<1> Whether the follower index was created.
+<2> Whether the follower shards are started.
+<3> Whether the follower index started following the leader index.
+
+include::../execution.asciidoc[]
+
+

+ 2 - 0
docs/java-rest/high-level/supported-apis.asciidoc

@@ -407,6 +407,8 @@ don't leak into the rest of the documentation.
 
 The Java High Level REST Client supports the following CCR APIs:
 
+* <<{upid}-ccr-put-follow>>
 * <<{upid}-ccr-pause-follow>>
 
+include::ccr/put_follow.asciidoc[]
 include::ccr/pause_follow.asciidoc[]