|
@@ -19,9 +19,8 @@
|
|
|
package org.elasticsearch.backwards;
|
|
|
|
|
|
import org.apache.http.HttpHost;
|
|
|
-import org.apache.http.entity.ContentType;
|
|
|
-import org.apache.http.entity.StringEntity;
|
|
|
import org.elasticsearch.Version;
|
|
|
+import org.elasticsearch.client.Request;
|
|
|
import org.elasticsearch.client.Response;
|
|
|
import org.elasticsearch.client.RestClient;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
@@ -34,25 +33,21 @@ import org.elasticsearch.test.rest.yaml.ObjectPath;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
-import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength;
|
|
|
-import static java.util.Collections.emptyMap;
|
|
|
-import static java.util.Collections.singletonMap;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
-import static org.hamcrest.Matchers.not;
|
|
|
|
|
|
public class IndexingIT extends ESRestTestCase {
|
|
|
|
|
|
private int indexDocs(String index, final int idStart, final int numDocs) throws IOException {
|
|
|
for (int i = 0; i < numDocs; i++) {
|
|
|
final int id = idStart + i;
|
|
|
- assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(),
|
|
|
- new StringEntity("{\"test\": \"test_" + randomAsciiOfLength(2) + "\"}", ContentType.APPLICATION_JSON)));
|
|
|
+ Request request = new Request("PUT", index + "/test/" + id);
|
|
|
+ request.setJsonEntity("{\"test\": \"test_" + randomAlphaOfLength(2) + "\"}");
|
|
|
+ assertOK(client().performRequest(request));
|
|
|
}
|
|
|
return numDocs;
|
|
|
}
|
|
@@ -105,7 +100,7 @@ public class IndexingIT extends ESRestTestCase {
|
|
|
logger.info("allowing shards on all nodes");
|
|
|
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name"));
|
|
|
ensureGreen(index);
|
|
|
- assertOK(client().performRequest("POST", index + "/_refresh"));
|
|
|
+ assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
|
|
|
List<Shard> shards = buildShards(index, nodes, newNodeClient);
|
|
|
Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get();
|
|
|
logger.info("primary resolved to: " + primary.getNode().getNodeName());
|
|
@@ -117,7 +112,7 @@ public class IndexingIT extends ESRestTestCase {
|
|
|
nUpdates = randomIntBetween(minUpdates, maxUpdates);
|
|
|
logger.info("indexing docs with [{}] concurrent updates after allowing shards on all nodes", nUpdates);
|
|
|
final int finalVersionForDoc2 = indexDocWithConcurrentUpdates(index, 2, nUpdates);
|
|
|
- assertOK(client().performRequest("POST", index + "/_refresh"));
|
|
|
+ assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
|
|
|
shards = buildShards(index, nodes, newNodeClient);
|
|
|
primary = shards.stream().filter(Shard::isPrimary).findFirst().get();
|
|
|
logger.info("primary resolved to: " + primary.getNode().getNodeName());
|
|
@@ -133,7 +128,7 @@ public class IndexingIT extends ESRestTestCase {
|
|
|
nUpdates = randomIntBetween(minUpdates, maxUpdates);
|
|
|
logger.info("indexing docs with [{}] concurrent updates after moving primary", nUpdates);
|
|
|
final int finalVersionForDoc3 = indexDocWithConcurrentUpdates(index, 3, nUpdates);
|
|
|
- assertOK(client().performRequest("POST", index + "/_refresh"));
|
|
|
+ assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
|
|
|
shards = buildShards(index, nodes, newNodeClient);
|
|
|
for (Shard shard : shards) {
|
|
|
assertVersion(index, 3, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc3);
|
|
@@ -146,7 +141,7 @@ public class IndexingIT extends ESRestTestCase {
|
|
|
nUpdates = randomIntBetween(minUpdates, maxUpdates);
|
|
|
logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 0", nUpdates);
|
|
|
final int finalVersionForDoc4 = indexDocWithConcurrentUpdates(index, 4, nUpdates);
|
|
|
- assertOK(client().performRequest("POST", index + "/_refresh"));
|
|
|
+ assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
|
|
|
shards = buildShards(index, nodes, newNodeClient);
|
|
|
for (Shard shard : shards) {
|
|
|
assertVersion(index, 4, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc4);
|
|
@@ -159,7 +154,7 @@ public class IndexingIT extends ESRestTestCase {
|
|
|
nUpdates = randomIntBetween(minUpdates, maxUpdates);
|
|
|
logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 1", nUpdates);
|
|
|
final int finalVersionForDoc5 = indexDocWithConcurrentUpdates(index, 5, nUpdates);
|
|
|
- assertOK(client().performRequest("POST", index + "/_refresh"));
|
|
|
+ assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
|
|
|
shards = buildShards(index, nodes, newNodeClient);
|
|
|
for (Shard shard : shards) {
|
|
|
assertVersion(index, 5, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc5);
|
|
@@ -191,7 +186,7 @@ public class IndexingIT extends ESRestTestCase {
|
|
|
logger.info("allowing shards on all nodes");
|
|
|
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name"));
|
|
|
ensureGreen(index);
|
|
|
- assertOK(client().performRequest("POST", index + "/_refresh"));
|
|
|
+ assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
|
|
|
for (final String bwcName : bwcNamesList) {
|
|
|
assertCount(index, "_only_nodes:" + bwcName, numDocs);
|
|
|
}
|
|
@@ -222,7 +217,7 @@ public class IndexingIT extends ESRestTestCase {
|
|
|
logger.info("setting number of replicas to 1");
|
|
|
updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 1));
|
|
|
ensureGreen(index);
|
|
|
- assertOK(client().performRequest("POST", index + "/_refresh"));
|
|
|
+ assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
|
|
|
|
|
|
for (Shard shard : buildShards(index, nodes, newNodeClient)) {
|
|
|
assertCount(index, "_only_nodes:" + shard.node.nodeName, numDocs);
|
|
@@ -237,20 +232,18 @@ public class IndexingIT extends ESRestTestCase {
|
|
|
logger.info("cluster discovered: {}", nodes.toString());
|
|
|
|
|
|
// Create the repository before taking the snapshot.
|
|
|
- String repoConfig = Strings
|
|
|
+ Request request = new Request("PUT", "/_snapshot/repo");
|
|
|
+ request.setJsonEntity(Strings
|
|
|
.toString(JsonXContent.contentBuilder()
|
|
|
.startObject()
|
|
|
- .field("type", "fs")
|
|
|
- .startObject("settings")
|
|
|
- .field("compress", randomBoolean())
|
|
|
- .field("location", System.getProperty("tests.path.repo"))
|
|
|
- .endObject()
|
|
|
- .endObject());
|
|
|
-
|
|
|
- assertOK(
|
|
|
- client().performRequest("PUT", "/_snapshot/repo", emptyMap(),
|
|
|
- new StringEntity(repoConfig, ContentType.APPLICATION_JSON))
|
|
|
- );
|
|
|
+ .field("type", "fs")
|
|
|
+ .startObject("settings")
|
|
|
+ .field("compress", randomBoolean())
|
|
|
+ .field("location", System.getProperty("tests.path.repo"))
|
|
|
+ .endObject()
|
|
|
+ .endObject()));
|
|
|
+
|
|
|
+ assertOK(client().performRequest(request));
|
|
|
|
|
|
String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(","));
|
|
|
|
|
@@ -264,34 +257,36 @@ public class IndexingIT extends ESRestTestCase {
|
|
|
createIndex(index, settings.build());
|
|
|
indexDocs(index, 0, between(50, 100));
|
|
|
ensureGreen(index);
|
|
|
- assertOK(client().performRequest("POST", index + "/_refresh"));
|
|
|
+ assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
|
|
|
|
|
|
- assertOK(
|
|
|
- client().performRequest("PUT", "/_snapshot/repo/bwc-snapshot", singletonMap("wait_for_completion", "true"),
|
|
|
- new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON))
|
|
|
- );
|
|
|
+ request = new Request("PUT", "/_snapshot/repo/bwc-snapshot");
|
|
|
+ request.addParameter("wait_for_completion", "true");
|
|
|
+ request.setJsonEntity("{\"indices\": \"" + index + "\"}");
|
|
|
+ assertOK(client().performRequest(request));
|
|
|
|
|
|
// Allocating shards on all nodes, taking snapshots should happen on all nodes.
|
|
|
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name"));
|
|
|
ensureGreen(index);
|
|
|
- assertOK(client().performRequest("POST", index + "/_refresh"));
|
|
|
+ assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
|
|
|
|
|
|
- assertOK(
|
|
|
- client().performRequest("PUT", "/_snapshot/repo/mixed-snapshot", singletonMap("wait_for_completion", "true"),
|
|
|
- new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON))
|
|
|
- );
|
|
|
+ request = new Request("PUT", "/_snapshot/repo/mixed-snapshot");
|
|
|
+ request.addParameter("wait_for_completion", "true");
|
|
|
+ request.setJsonEntity("{\"indices\": \"" + index + "\"}");
|
|
|
}
|
|
|
|
|
|
private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
|
|
|
- final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference));
|
|
|
+ Request request = new Request("GET", index + "/_count");
|
|
|
+ request.addParameter("preference", preference);
|
|
|
+ final Response response = client().performRequest(request);
|
|
|
assertOK(response);
|
|
|
final int actualCount = Integer.parseInt(ObjectPath.createFromResponse(response).evaluate("count").toString());
|
|
|
assertThat(actualCount, equalTo(expectedCount));
|
|
|
}
|
|
|
|
|
|
private void assertVersion(final String index, final int docId, final String preference, final int expectedVersion) throws IOException {
|
|
|
- final Response response = client().performRequest("GET", index + "/test/" + docId,
|
|
|
- Collections.singletonMap("preference", preference));
|
|
|
+ Request request = new Request("GET", index + "/test/" + docId);
|
|
|
+ request.addParameter("preference", preference);
|
|
|
+ final Response response = client().performRequest(request);
|
|
|
assertOK(response);
|
|
|
final int actualVersion = Integer.parseInt(ObjectPath.createFromResponse(response).evaluate("_version").toString());
|
|
|
assertThat("version mismatch for doc [" + docId + "] preference [" + preference + "]", actualVersion, equalTo(expectedVersion));
|
|
@@ -323,7 +318,9 @@ public class IndexingIT extends ESRestTestCase {
|
|
|
}
|
|
|
|
|
|
private List<Shard> buildShards(String index, Nodes nodes, RestClient client) throws IOException {
|
|
|
- Response response = client.performRequest("GET", index + "/_stats", singletonMap("level", "shards"));
|
|
|
+ Request request = new Request("GET", index + "/_stats");
|
|
|
+ request.addParameter("level", "shards");
|
|
|
+ Response response = client.performRequest(request);
|
|
|
List<Object> shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0");
|
|
|
ArrayList<Shard> shards = new ArrayList<>();
|
|
|
for (Object shard : shardStats) {
|
|
@@ -341,7 +338,7 @@ public class IndexingIT extends ESRestTestCase {
|
|
|
}
|
|
|
|
|
|
private Nodes buildNodeAndVersions() throws IOException {
|
|
|
- Response response = client().performRequest("GET", "_nodes");
|
|
|
+ Response response = client().performRequest(new Request("GET", "_nodes"));
|
|
|
ObjectPath objectPath = ObjectPath.createFromResponse(response);
|
|
|
Map<String, Object> nodesAsMap = objectPath.evaluate("nodes");
|
|
|
Nodes nodes = new Nodes();
|
|
@@ -352,7 +349,7 @@ public class IndexingIT extends ESRestTestCase {
|
|
|
Version.fromString(objectPath.evaluate("nodes." + id + ".version")),
|
|
|
HttpHost.create(objectPath.evaluate("nodes." + id + ".http.publish_address"))));
|
|
|
}
|
|
|
- response = client().performRequest("GET", "_cluster/state");
|
|
|
+ response = client().performRequest(new Request("GET", "_cluster/state"));
|
|
|
nodes.setMasterNodeId(ObjectPath.createFromResponse(response).evaluate("master_node"));
|
|
|
return nodes;
|
|
|
}
|