Browse Source

Fix CloseWhileRelocatingShardsIT (#38728)

Tanguy Leroux 6 years ago
parent
commit
f15be14ee3

+ 56 - 25
server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java

@@ -18,8 +18,10 @@
  */
 package org.elasticsearch.indices.state;
 
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
@@ -32,13 +34,14 @@ import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocation
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
 import org.elasticsearch.indices.recovery.StartRecoveryRequest;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.BackgroundIndexer;
 import org.elasticsearch.test.ESIntegTestCase;
-import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.test.transport.StubbableTransport;
 import org.elasticsearch.transport.TransportService;
 
 import java.util.ArrayList;
@@ -57,6 +60,7 @@ import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed;
 import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
 
 @ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
 public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
@@ -68,9 +72,11 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
 
     @Override
     protected Settings nodeSettings(int nodeOrdinal) {
+        final int maxRecoveries = Integer.MAX_VALUE;
         return Settings.builder()
             .put(super.nodeSettings(nodeOrdinal))
-            .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE)
+            .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), maxRecoveries)
+            .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), maxRecoveries)
             .put(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), -1)
             .build();
     }
@@ -80,7 +86,6 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
         return 3;
     }
 
-    @TestLogging("org.elasticsearch.cluster.metadata.MetaDataIndexStateService:DEBUG,org.elasticsearch.action.admin.indices.close:DEBUG")
     public void testCloseWhileRelocatingShards() throws Exception {
         final String[] indices = new String[randomIntBetween(3, 5)];
         final Map<String, Long> docsPerIndex = new HashMap<>();
@@ -119,21 +124,19 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
 
         final String targetNode = internalCluster().startDataOnlyNode();
         ensureClusterSizeConsistency(); // wait for the master to finish processing join.
-        final MockTransportService targetTransportService =
-            (MockTransportService) internalCluster().getInstance(TransportService.class, targetNode);
 
-        final Set<String> acknowledgedCloses = ConcurrentCollections.newConcurrentSet();
         try {
             final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
+            final ClusterState state = clusterService.state();
             final CountDownLatch latch = new CountDownLatch(indices.length);
-            final CountDownLatch release = new CountDownLatch(1);
+            final CountDownLatch release = new CountDownLatch(indices.length);
 
             // relocate one shard for every index to be closed
             final AllocationCommands commands = new AllocationCommands();
             for (final String index : indices) {
                 final NumShards numShards = getNumShards(index);
                 final int shardId = numShards.numPrimaries == 1 ? 0 : randomIntBetween(0, numShards.numPrimaries - 1);
-                final IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index);
+                final IndexRoutingTable indexRoutingTable = state.routingTable().index(index);
 
                 final ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard();
                 assertTrue(primary.started());
@@ -146,24 +149,49 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
                         currentNodeId = replica.currentNodeId();
                     }
                 }
+                commands.add(new MoveAllocationCommand(index, shardId, state.nodes().resolveNode(currentNodeId).getName(), targetNode));
+            }
+
+            // Build the list of shards for which recoveries will be blocked
+            final Set<ShardId> blockedShards = commands.commands().stream()
+                .map(c -> (MoveAllocationCommand) c)
+                .map(c -> new ShardId(clusterService.state().metaData().index(c.index()).getIndex(), c.shardId()))
+                .collect(Collectors.toSet());
+            assertThat(blockedShards, hasSize(indices.length));
+
+            final Set<String> acknowledgedCloses = ConcurrentCollections.newConcurrentSet();
+            final Set<String> interruptedRecoveries = ConcurrentCollections.newConcurrentSet();
 
-                final DiscoveryNode sourceNode = clusterService.state().nodes().resolveNode(primary.currentNodeId());
-                targetTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, sourceNode.getName()),
-                        (connection, requestId, action, request, options) -> {
-                            if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) {
-                                logger.debug("blocking recovery of shard {}", ((StartRecoveryRequest) request).shardId());
-                                latch.countDown();
-                                try {
-                                    release.await();
-                                    logger.debug("releasing recovery of shard {}", ((StartRecoveryRequest) request).shardId());
-                                } catch (InterruptedException e) {
-                                    throw new AssertionError(e);
-                                }
-                            }
-                            connection.sendRequest(requestId, action, request, options);
+            // Create a SendRequestBehavior that will block outgoing start recovery request
+            final StubbableTransport.SendRequestBehavior sendBehavior = (connection, requestId, action, request, options) -> {
+                if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) {
+                    final StartRecoveryRequest startRecoveryRequest = ((StartRecoveryRequest) request);
+                    if (blockedShards.contains(startRecoveryRequest.shardId())) {
+                        logger.debug("blocking recovery of shard {}", startRecoveryRequest.shardId());
+                        latch.countDown();
+                        try {
+                            release.await();
+                            logger.debug("releasing recovery of shard {}", startRecoveryRequest.shardId());
+                        } catch (final InterruptedException e) {
+                            logger.warn(() -> new ParameterizedMessage("exception when releasing recovery of shard {}",
+                                startRecoveryRequest.shardId()), e);
+                            interruptedRecoveries.add(startRecoveryRequest.shardId().getIndexName());
+                            Thread.currentThread().interrupt();
+                            return;
                         }
-                    );
-                commands.add(new MoveAllocationCommand(index, shardId, currentNodeId, targetNode));
+                    }
+                }
+                connection.sendRequest(requestId, action, request, options);
+            };
+
+            final MockTransportService targetTransportService =
+                (MockTransportService) internalCluster().getInstance(TransportService.class, targetNode);
+
+            for (DiscoveryNode node : state.getNodes()) {
+                if (node.isDataNode() && node.getName().equals(targetNode) == false) {
+                    final TransportService sourceTransportService = internalCluster().getInstance(TransportService.class, node.getName());
+                    targetTransportService.addSendBehavior(sourceTransportService, sendBehavior);
+                }
             }
 
             assertAcked(client().admin().cluster().reroute(new ClusterRerouteRequest().commands(commands)).get());
@@ -222,12 +250,15 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
 
             targetTransportService.clearAllRules();
 
+            // If a shard recovery has been interrupted, we expect its index to be closed
+            interruptedRecoveries.forEach(CloseIndexIT::assertIndexIsClosed);
+
             assertThat("Consider that the test failed if no indices were successfully closed", acknowledgedCloses.size(), greaterThan(0));
             assertAcked(client().admin().indices().prepareOpen("index-*"));
             ensureGreen(indices);
 
             for (String index : acknowledgedCloses) {
-                long docsCount = client().prepareSearch(index).setSize(0).get().getHits().getTotalHits().value;
+                long docsCount = client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get().getHits().getTotalHits().value;
                 assertEquals("Expected " + docsPerIndex.get(index) + " docs in index " + index + " but got " + docsCount
                     + " (close acknowledged=" + acknowledgedCloses.contains(index) + ")", (long) docsPerIndex.get(index), docsCount);
             }