|
|
@@ -53,14 +53,18 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllS
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
|
|
|
|
|
|
-@TestLogging("_root:DEBUG,org.elasticsearch.index.shard:TRACE,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.index.seqno:TRACE,org.elasticsearch.indices.recovery:TRACE")
|
|
|
+@TestLogging("_root:DEBUG,org.elasticsearch.index.shard:TRACE,org.elasticsearch.cluster.service:TRACE," +
|
|
|
+ "org.elasticsearch.index.seqno:TRACE,org.elasticsearch.indices.recovery:TRACE")
|
|
|
public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|
|
private final Logger logger = LogManager.getLogger(RecoveryWhileUnderLoadIT.class);
|
|
|
|
|
|
public void testRecoverWhileUnderLoadAllocateReplicasTest() throws Exception {
|
|
|
logger.info("--> creating test index ...");
|
|
|
int numberOfShards = numberOfShards();
|
|
|
- assertAcked(prepareCreate("test", 1, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)));
|
|
|
+ assertAcked(prepareCreate("test", 1, Settings.builder()
|
|
|
+ .put(SETTING_NUMBER_OF_SHARDS, numberOfShards)
|
|
|
+ .put(SETTING_NUMBER_OF_REPLICAS, 1)
|
|
|
+ .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)));
|
|
|
|
|
|
final int totalNumDocs = scaledRandomIntBetween(200, 10000);
|
|
|
int waitFor = totalNumDocs / 10;
|
|
|
@@ -92,7 +96,8 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|
|
|
|
|
logger.info("--> waiting for GREEN health status ...");
|
|
|
// make sure the cluster state is green, and all has been recovered
|
|
|
- assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus());
|
|
|
+ assertNoTimeout(client().admin().cluster().prepareHealth()
|
|
|
+ .setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus());
|
|
|
|
|
|
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
|
|
waitForDocs(totalNumDocs, indexer);
|
|
|
@@ -113,7 +118,10 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|
|
public void testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() throws Exception {
|
|
|
logger.info("--> creating test index ...");
|
|
|
int numberOfShards = numberOfShards();
|
|
|
- assertAcked(prepareCreate("test", 1, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)));
|
|
|
+ assertAcked(prepareCreate("test", 1, Settings.builder()
|
|
|
+ .put(SETTING_NUMBER_OF_SHARDS, numberOfShards)
|
|
|
+ .put(SETTING_NUMBER_OF_REPLICAS, 1)
|
|
|
+ .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)));
|
|
|
|
|
|
final int totalNumDocs = scaledRandomIntBetween(200, 10000);
|
|
|
int waitFor = totalNumDocs / 10;
|
|
|
@@ -142,7 +150,8 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|
|
allowNodes("test", 4);
|
|
|
|
|
|
logger.info("--> waiting for GREEN health status ...");
|
|
|
- assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus());
|
|
|
+ assertNoTimeout(client().admin().cluster().prepareHealth()
|
|
|
+ .setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus());
|
|
|
|
|
|
|
|
|
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
|
|
@@ -164,7 +173,9 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|
|
public void testRecoverWhileUnderLoadWithReducedAllowedNodes() throws Exception {
|
|
|
logger.info("--> creating test index ...");
|
|
|
int numberOfShards = numberOfShards();
|
|
|
- assertAcked(prepareCreate("test", 2, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)));
|
|
|
+ assertAcked(prepareCreate("test", 2, Settings.builder()
|
|
|
+ .put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)
|
|
|
+ .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)));
|
|
|
|
|
|
final int totalNumDocs = scaledRandomIntBetween(200, 10000);
|
|
|
int waitFor = totalNumDocs / 10;
|
|
|
@@ -194,7 +205,10 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|
|
allowNodes("test", 4);
|
|
|
|
|
|
logger.info("--> waiting for GREEN health status ...");
|
|
|
- assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus().setWaitForNoRelocatingShards(true));
|
|
|
+ assertNoTimeout(client().admin().cluster().prepareHealth()
|
|
|
+ .setWaitForEvents(Priority.LANGUID).setTimeout("5m")
|
|
|
+ .setWaitForGreenStatus()
|
|
|
+ .setWaitForNoRelocatingShards(true));
|
|
|
|
|
|
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
|
|
waitForDocs(totalNumDocs, indexer);
|
|
|
@@ -205,23 +219,31 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|
|
logger.info("--> allow 3 nodes for index [test] ...");
|
|
|
allowNodes("test", 3);
|
|
|
logger.info("--> waiting for relocations ...");
|
|
|
- assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForNoRelocatingShards(true));
|
|
|
+ assertNoTimeout(client().admin().cluster().prepareHealth()
|
|
|
+ .setWaitForEvents(Priority.LANGUID).setTimeout("5m")
|
|
|
+ .setWaitForNoRelocatingShards(true));
|
|
|
|
|
|
logger.info("--> allow 2 nodes for index [test] ...");
|
|
|
allowNodes("test", 2);
|
|
|
logger.info("--> waiting for relocations ...");
|
|
|
- assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForNoRelocatingShards(true));
|
|
|
+ assertNoTimeout(client().admin().cluster().prepareHealth()
|
|
|
+ .setWaitForEvents(Priority.LANGUID).setTimeout("5m")
|
|
|
+ .setWaitForNoRelocatingShards(true));
|
|
|
|
|
|
logger.info("--> allow 1 nodes for index [test] ...");
|
|
|
allowNodes("test", 1);
|
|
|
logger.info("--> waiting for relocations ...");
|
|
|
- assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForNoRelocatingShards(true));
|
|
|
+ assertNoTimeout(client().admin().cluster().prepareHealth()
|
|
|
+ .setWaitForEvents(Priority.LANGUID).setTimeout("5m")
|
|
|
+ .setWaitForNoRelocatingShards(true));
|
|
|
|
|
|
logger.info("--> marking and waiting for indexing threads to stop ...");
|
|
|
indexer.stop();
|
|
|
logger.info("--> indexing threads stopped");
|
|
|
|
|
|
- assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForNoRelocatingShards(true));
|
|
|
+ assertNoTimeout(client().admin().cluster().prepareHealth()
|
|
|
+ .setWaitForEvents(Priority.LANGUID).setTimeout("5m")
|
|
|
+ .setWaitForNoRelocatingShards(true));
|
|
|
|
|
|
logger.info("--> refreshing the index");
|
|
|
refreshAndAssert();
|
|
|
@@ -235,7 +257,10 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|
|
final int numReplicas = 0;
|
|
|
logger.info("--> creating test index ...");
|
|
|
int allowNodes = 2;
|
|
|
- assertAcked(prepareCreate("test", 3, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, numReplicas).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)));
|
|
|
+ assertAcked(prepareCreate("test", 3, Settings.builder()
|
|
|
+ .put(SETTING_NUMBER_OF_SHARDS, numShards)
|
|
|
+ .put(SETTING_NUMBER_OF_REPLICAS, numReplicas)
|
|
|
+ .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)));
|
|
|
|
|
|
final int numDocs = scaledRandomIntBetween(200, 9999);
|
|
|
|
|
|
@@ -258,7 +283,8 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|
|
logger.info("--> indexing threads stopped");
|
|
|
logger.info("--> bump up number of replicas to 1 and allow all nodes to hold the index");
|
|
|
allowNodes("test", 3);
|
|
|
- assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("number_of_replicas", 1)).get());
|
|
|
+ assertAcked(client().admin().indices().prepareUpdateSettings("test")
|
|
|
+ .setSettings(Settings.builder().put("number_of_replicas", 1)).get());
|
|
|
ensureGreen(TimeValue.timeValueMinutes(5));
|
|
|
|
|
|
logger.info("--> refreshing the index");
|
|
|
@@ -273,7 +299,8 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|
|
SearchResponse[] iterationResults = new SearchResponse[iterations];
|
|
|
boolean error = false;
|
|
|
for (int i = 0; i < iterations; i++) {
|
|
|
- SearchResponse searchResponse = client().prepareSearch().setSize((int) numberOfDocs).setQuery(matchAllQuery()).addSort("id", SortOrder.ASC).get();
|
|
|
+ SearchResponse searchResponse = client().prepareSearch().setSize((int) numberOfDocs).setQuery(matchAllQuery())
|
|
|
+ .addSort("id", SortOrder.ASC).get();
|
|
|
logSearchResponse(numberOfShards, numberOfDocs, i, searchResponse);
|
|
|
iterationResults[i] = searchResponse;
|
|
|
if (searchResponse.getHits().getTotalHits().value != numberOfDocs) {
|
|
|
@@ -286,7 +313,8 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|
|
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().get();
|
|
|
for (ShardStats shardStats : indicesStatsResponse.getShards()) {
|
|
|
DocsStats docsStats = shardStats.getStats().docs;
|
|
|
- logger.info("shard [{}] - count {}, primary {}", shardStats.getShardRouting().id(), docsStats.getCount(), shardStats.getShardRouting().primary());
|
|
|
+ logger.info("shard [{}] - count {}, primary {}", shardStats.getShardRouting().id(), docsStats.getCount(),
|
|
|
+ shardStats.getShardRouting().primary());
|
|
|
}
|
|
|
|
|
|
ClusterService clusterService = clusterService();
|
|
|
@@ -332,12 +360,14 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|
|
}
|
|
|
|
|
|
private void logSearchResponse(int numberOfShards, long numberOfDocs, int iteration, SearchResponse searchResponse) {
|
|
|
- logger.info("iteration [{}] - successful shards: {} (expected {})", iteration, searchResponse.getSuccessfulShards(), numberOfShards);
|
|
|
+ logger.info("iteration [{}] - successful shards: {} (expected {})", iteration,
|
|
|
+ searchResponse.getSuccessfulShards(), numberOfShards);
|
|
|
logger.info("iteration [{}] - failed shards: {} (expected 0)", iteration, searchResponse.getFailedShards());
|
|
|
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {
|
|
|
logger.info("iteration [{}] - shard failures: {}", iteration, Arrays.toString(searchResponse.getShardFailures()));
|
|
|
}
|
|
|
- logger.info("iteration [{}] - returned documents: {} (expected {})", iteration, searchResponse.getHits().getTotalHits().value, numberOfDocs);
|
|
|
+ logger.info("iteration [{}] - returned documents: {} (expected {})", iteration,
|
|
|
+ searchResponse.getHits().getTotalHits().value, numberOfDocs);
|
|
|
}
|
|
|
|
|
|
private void refreshAndAssert() throws Exception {
|