|
|
@@ -46,6 +46,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
|
|
+import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|
|
import org.elasticsearch.cluster.routing.RecoverySource;
|
|
|
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
|
|
|
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
|
|
|
@@ -324,6 +325,86 @@ public class IndexRecoveryIT extends ESIntegTestCase {
|
|
|
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), numOfDocs);
|
|
|
}
|
|
|
|
|
|
+ public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exception {
|
|
|
+ logger.info("--> start node A");
|
|
|
+ final String nodeA = internalCluster().startNode();
|
|
|
+
|
|
|
+ logger.info("--> create index on node: {}", nodeA);
|
|
|
+ createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT)
|
|
|
+ .getShards()[0].getStats().getStore().size();
|
|
|
+
|
|
|
+ logger.info("--> start node B");
|
|
|
+ // force a shard recovery from nodeA to nodeB
|
|
|
+ final String nodeB = internalCluster().startNode();
|
|
|
+
|
|
|
+ logger.info("--> add replica for {} on node: {}", INDEX_NAME, nodeB);
|
|
|
+ assertAcked(client().admin().indices().prepareUpdateSettings(INDEX_NAME)
|
|
|
+ .setSettings(Settings.builder()
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
|
|
+ .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0)));
|
|
|
+ ensureGreen(INDEX_NAME);
|
|
|
+
|
|
|
+ logger.info("--> start node C");
|
|
|
+ final String nodeC = internalCluster().startNode();
|
|
|
+
|
|
|
+ // do sync flush to gen sync id
|
|
|
+ assertThat(client().admin().indices().prepareSyncedFlush(INDEX_NAME).get().failedShards(), equalTo(0));
|
|
|
+
|
|
|
+ // hold peer recovery on phase 2 after nodeB down
|
|
|
+ CountDownLatch phase1ReadyBlocked = new CountDownLatch(1);
|
|
|
+ CountDownLatch allowToCompletePhase1Latch = new CountDownLatch(1);
|
|
|
+ MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeA);
|
|
|
+ transportService.addSendBehavior((connection, requestId, action, request, options) -> {
|
|
|
+ if (PeerRecoveryTargetService.Actions.CLEAN_FILES.equals(action)) {
|
|
|
+ phase1ReadyBlocked.countDown();
|
|
|
+ try {
|
|
|
+ allowToCompletePhase1Latch.await();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new AssertionError(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ connection.sendRequest(requestId, action, request, options);
|
|
|
+ });
|
|
|
+
|
|
|
+ logger.info("--> restart node B");
|
|
|
+ internalCluster().restartNode(nodeB,
|
|
|
+ new InternalTestCluster.RestartCallback() {
|
|
|
+ @Override
|
|
|
+ public Settings onNodeStopped(String nodeName) throws Exception {
|
|
|
+ phase1ReadyBlocked.await();
|
|
|
+ // nodeB stopped, peer recovery from nodeA to nodeC, it will be cancelled after nodeB get started.
|
|
|
+ RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
|
|
|
+
|
|
|
+ List<RecoveryState> recoveryStates = response.shardRecoveryStates().get(INDEX_NAME);
|
|
|
+ List<RecoveryState> nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates);
|
|
|
+ assertThat(nodeCRecoveryStates.size(), equalTo(1));
|
|
|
+
|
|
|
+ assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE,
|
|
|
+ false, nodeA, nodeC);
|
|
|
+ validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());
|
|
|
+
|
|
|
+ return super.onNodeStopped(nodeName);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // wait for peer recovery from nodeA to nodeB which is a no-op recovery so it skips the CLEAN_FILES stage and hence is not blocked
|
|
|
+ ensureGreen();
|
|
|
+ allowToCompletePhase1Latch.countDown();
|
|
|
+ transportService.clearAllRules();
|
|
|
+
|
|
|
+ // make sure nodeA has primary and nodeB has replica
|
|
|
+ ClusterState state = client().admin().cluster().prepareState().get().getState();
|
|
|
+ List<ShardRouting> startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED);
|
|
|
+ assertThat(startedShards.size(), equalTo(2));
|
|
|
+ for (ShardRouting shardRouting : startedShards) {
|
|
|
+ if (shardRouting.primary()) {
|
|
|
+ assertThat(state.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(nodeA));
|
|
|
+ } else {
|
|
|
+ assertThat(state.nodes().get(shardRouting.currentNodeId()).getName(), equalTo(nodeB));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void testRerouteRecovery() throws Exception {
|
|
|
logger.info("--> start node A");
|
|
|
final String nodeA = internalCluster().startNode();
|