|
@@ -119,6 +119,7 @@ import java.util.stream.IntStream;
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
|
|
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
|
|
|
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
@@ -369,9 +370,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
leaderClient().prepareIndex("index1").setId(Long.toString(i)).setSource(source, XContentType.JSON).get();
|
|
|
}
|
|
|
|
|
|
- assertBusy(
|
|
|
- () -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(firstBatchNumDocs))
|
|
|
- );
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("index2"), firstBatchNumDocs));
|
|
|
MappingMetadata mappingMetadata = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings().get("index2");
|
|
|
assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetadata.sourceAsMap()), equalTo("integer"));
|
|
|
assertThat(XContentMapValues.extractValue("properties.k", mappingMetadata.sourceAsMap()), nullValue());
|
|
@@ -382,12 +381,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
leaderClient().prepareIndex("index1").setId(Long.toString(i)).setSource(source, XContentType.JSON).get();
|
|
|
}
|
|
|
|
|
|
- assertBusy(
|
|
|
- () -> assertThat(
|
|
|
- followerClient().prepareSearch("index2").get().getHits().getTotalHits().value,
|
|
|
- equalTo(firstBatchNumDocs + secondBatchNumDocs)
|
|
|
- )
|
|
|
- );
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("index2"), firstBatchNumDocs + secondBatchNumDocs));
|
|
|
mappingMetadata = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings().get("index2");
|
|
|
assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetadata.sourceAsMap()), equalTo("integer"));
|
|
|
assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetadata.sourceAsMap()), equalTo("long"));
|
|
@@ -413,7 +407,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
|
|
|
|
|
leaderClient().prepareIndex("index1").setId("1").setSource("{\"f\":1}", XContentType.JSON).get();
|
|
|
- assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L)));
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("index2"), 1));
|
|
|
pauseFollow("index2");
|
|
|
|
|
|
MappingMetadata mappingMetadata = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings().get("index2");
|
|
@@ -711,7 +705,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
|
|
|
|
|
leaderClient().prepareIndex("index1").setId("1").setSource("{}", XContentType.JSON).get();
|
|
|
- assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L)));
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("index2"), 1));
|
|
|
|
|
|
leaderClient().admin().indices().close(new CloseIndexRequest("index1")).actionGet();
|
|
|
assertBusy(() -> {
|
|
@@ -735,7 +729,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
|
|
|
leaderClient().admin().indices().open(new OpenIndexRequest("index1")).actionGet();
|
|
|
leaderClient().prepareIndex("index1").setId("2").setSource("{}", XContentType.JSON).get();
|
|
|
- assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(2L)));
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("index2"), 2));
|
|
|
|
|
|
pauseFollow("index2");
|
|
|
}
|
|
@@ -757,7 +751,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
|
|
|
|
|
leaderClient().prepareIndex("index1").setId("1").setSource("{}", XContentType.JSON).get();
|
|
|
- assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L)));
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("index2"), 1));
|
|
|
|
|
|
followerClient().admin().indices().close(new CloseIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
|
|
|
leaderClient().prepareIndex("index1").setId("2").setSource("{}", XContentType.JSON).get();
|
|
@@ -769,7 +763,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L));
|
|
|
});
|
|
|
followerClient().admin().indices().open(new OpenIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
|
|
|
- assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(2L)));
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("index2"), 2));
|
|
|
|
|
|
pauseFollow("index2");
|
|
|
}
|
|
@@ -791,7 +785,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
|
|
|
|
|
leaderClient().prepareIndex("index1").setId("1").setSource("{}", XContentType.JSON).get();
|
|
|
- assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L)));
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("index2"), 1));
|
|
|
|
|
|
leaderClient().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet();
|
|
|
assertBusy(() -> {
|
|
@@ -872,7 +866,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
|
|
|
|
|
leaderClient().prepareIndex("index1").setId("1").setSource("{}", XContentType.JSON).get();
|
|
|
- assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L)));
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("index2"), 1));
|
|
|
|
|
|
followerClient().admin().indices().delete(new DeleteIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
|
|
|
leaderClient().prepareIndex("index1").setId("2").setSource("{}", XContentType.JSON).get();
|
|
@@ -935,7 +929,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
PutFollowAction.Request followRequest = putFollow("index1", "index2");
|
|
|
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
|
|
leaderClient().prepareIndex("index1").setSource("{}", XContentType.JSON).get();
|
|
|
- assertBusy(() -> { assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L)); });
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("index2"), 1));
|
|
|
|
|
|
// Indexing directly into index2 would fail now, because index2 is a follow index.
|
|
|
// We can't test this here because an assertion trips before an actual error is thrown and then index call hangs.
|
|
@@ -952,7 +946,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
.setSource("{}", XContentType.JSON)
|
|
|
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
|
|
.get();
|
|
|
- assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(2L));
|
|
|
+ assertHitCount(followerClient().prepareSearch("index2"), 2);
|
|
|
}
|
|
|
|
|
|
public void testUnknownClusterAlias() throws Exception {
|
|
@@ -1024,9 +1018,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
for (long i = 0; i < firstBatchNumDocs; i++) {
|
|
|
leaderClient().prepareIndex("leader").setSource("{}", XContentType.JSON).get();
|
|
|
}
|
|
|
- assertBusy(
|
|
|
- () -> assertThat(followerClient().prepareSearch("follower").get().getHits().getTotalHits().value, equalTo(firstBatchNumDocs))
|
|
|
- );
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("follower"), firstBatchNumDocs));
|
|
|
|
|
|
// Sanity check that the setting has not been set in follower index:
|
|
|
{
|
|
@@ -1053,10 +1045,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
assertThat(getFollowTaskSettingsVersion("follower"), equalTo(2L));
|
|
|
|
|
|
try {
|
|
|
- assertThat(
|
|
|
- followerClient().prepareSearch("follower").get().getHits().getTotalHits().value,
|
|
|
- equalTo(firstBatchNumDocs + secondBatchNumDocs)
|
|
|
- );
|
|
|
+ assertHitCount(followerClient().prepareSearch("follower"), firstBatchNumDocs + secondBatchNumDocs);
|
|
|
} catch (Exception e) {
|
|
|
throw new AssertionError("error while searching", e);
|
|
|
}
|
|
@@ -1080,9 +1069,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
for (long i = 0; i < firstBatchNumDocs; i++) {
|
|
|
leaderClient().prepareIndex("leader").setSource("{}", XContentType.JSON).get();
|
|
|
}
|
|
|
- assertBusy(
|
|
|
- () -> assertThat(followerClient().prepareSearch("follower").get().getHits().getTotalHits().value, equalTo(firstBatchNumDocs))
|
|
|
- );
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("follower"), firstBatchNumDocs));
|
|
|
|
|
|
// Sanity check that the setting has not been set in follower index:
|
|
|
{
|
|
@@ -1108,10 +1095,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
assertThat(getFollowTaskSettingsVersion("follower"), equalTo(2L));
|
|
|
|
|
|
try {
|
|
|
- assertThat(
|
|
|
- followerClient().prepareSearch("follower").get().getHits().getTotalHits().value,
|
|
|
- equalTo(firstBatchNumDocs + secondBatchNumDocs)
|
|
|
- );
|
|
|
+ assertHitCount(followerClient().prepareSearch("follower"), firstBatchNumDocs + secondBatchNumDocs);
|
|
|
} catch (Exception e) {
|
|
|
throw new AssertionError("error while searching", e);
|
|
|
}
|
|
@@ -1133,9 +1117,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
leaderClient().prepareIndex("leader").setSource("{}", XContentType.JSON).get();
|
|
|
}
|
|
|
|
|
|
- assertBusy(
|
|
|
- () -> assertThat(followerClient().prepareSearch("follower").get().getHits().getTotalHits().value, equalTo(firstBatchNumDocs))
|
|
|
- );
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("follower"), firstBatchNumDocs));
|
|
|
assertThat(getFollowTaskSettingsVersion("follower"), equalTo(1L));
|
|
|
assertThat(getFollowTaskMappingVersion("follower"), equalTo(1L));
|
|
|
|
|
@@ -1185,10 +1167,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
);
|
|
|
|
|
|
try {
|
|
|
- assertThat(
|
|
|
- followerClient().prepareSearch("follower").get().getHits().getTotalHits().value,
|
|
|
- equalTo(firstBatchNumDocs + secondBatchNumDocs)
|
|
|
- );
|
|
|
+ assertHitCount(followerClient().prepareSearch("follower"), firstBatchNumDocs + secondBatchNumDocs);
|
|
|
} catch (Exception e) {
|
|
|
throw new AssertionError("error while searching", e);
|
|
|
}
|
|
@@ -1574,7 +1553,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|
|
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
|
|
|
|
|
leaderClient().prepareIndex("index1").setId("1").setSource("{}", XContentType.JSON).get();
|
|
|
- assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L)));
|
|
|
+ assertBusy(() -> assertHitCount(followerClient().prepareSearch("index2"), 1));
|
|
|
|
|
|
assertBusy(() -> {
|
|
|
String action = ShardFollowTask.NAME + "[c]";
|