浏览代码

Deduplicate Shard Started Requests (#82089)

Deduplicate shard started requests the same way we deduplicate shard-failed
and shard snapshot state updates already.

closes #81628
Armin Braun 3 年之前
父节点
当前提交
01debdc856

+ 37 - 13
server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

@@ -79,9 +79,8 @@ public class ShardStateAction {
     private final ClusterService clusterService;
     private final ThreadPool threadPool;
 
-    // a list of shards that failed during replication
-    // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
-    private final ResultDeduplicator<FailedShardEntry, Void> remoteFailedShardsDeduplicator = new ResultDeduplicator<>();
+    // we deduplicate these shard state requests in order to avoid sending duplicate failed/started shard requests for a shard
+    private final ResultDeduplicator<TransportRequest, Void> remoteShardStateUpdateDeduplicator = new ResultDeduplicator<>();
 
     @Inject
     public ShardStateAction(
@@ -196,15 +195,26 @@ public class ShardStateAction {
         ActionListener<Void> listener
     ) {
         assert primaryTerm > 0L : "primary term should be strictly positive";
-        remoteFailedShardsDeduplicator.executeOnce(
+        remoteShardStateUpdateDeduplicator.executeOnce(
             new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale),
             listener,
             (req, reqListener) -> sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), req, reqListener)
         );
     }
 
-    int remoteShardFailedCacheSize() {
-        return remoteFailedShardsDeduplicator.size();
+    int remoteShardRequestsInFlight() {
+        return remoteShardStateUpdateDeduplicator.size();
+    }
+
+    /**
+     * Clears out {@link #remoteShardStateUpdateDeduplicator}. Called by
+     * {@link org.elasticsearch.indices.cluster.IndicesClusterStateService} in case of a master failover to enable sending fresh requests
+     * to the new master right away on master failover.
+     * This method is best effort in so far that it might clear out valid requests in edge cases during master failover. This is not an
+     * issue functionally and merely results in some unnecessary transport requests.
+     */
+    public void clearRemoteShardRequestDeduplicator() {
+        remoteShardStateUpdateDeduplicator.clear();
     }
 
     /**
@@ -588,14 +598,11 @@ public class ShardStateAction {
         final ActionListener<Void> listener,
         final ClusterState currentState
     ) {
-        final StartedShardEntry entry = new StartedShardEntry(
-            shardRouting.shardId(),
-            shardRouting.allocationId().getId(),
-            primaryTerm,
-            message,
-            timestampRange
+        remoteShardStateUpdateDeduplicator.executeOnce(
+            new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message, timestampRange),
+            listener,
+            (req, l) -> sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, req, l)
         );
-        sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener);
     }
 
     private static class ShardStartedTransportHandler implements TransportRequestHandler<StartedShardEntry> {
@@ -842,6 +849,23 @@ public class ShardStateAction {
                 message
             );
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            StartedShardEntry that = (StartedShardEntry) o;
+            return primaryTerm == that.primaryTerm
+                && shardId.equals(that.shardId)
+                && allocationId.equals(that.allocationId)
+                && message.equals(that.message)
+                && timestampRange.equals(that.timestampRange);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(shardId, allocationId, primaryTerm, message, timestampRange);
+        }
     }
 
     public static class NoLongerPrimaryShardException extends ElasticsearchException {

+ 7 - 0
server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

@@ -197,6 +197,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
 
         final ClusterState state = event.state();
 
+        final DiscoveryNode currentMaster = state.nodes().getMasterNode();
+        if (currentMaster != null && currentMaster.equals(event.previousState().nodes().getMasterNode()) == false) {
+            // master node changed, clear request deduplicator so we send out new state update requests right away without waiting for
+            // the in-flight ones to fail first
+            shardStateAction.clearRemoteShardRequestDeduplicator();
+        }
+
         // we need to clean the shards and indices we have on this node, since we
         // are going to recover them again once state persistence is disabled (no master / not recovered)
         // TODO: feels hacky, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks?

+ 35 - 1
server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java

@@ -149,7 +149,7 @@ public class ShardStateActionTests extends ESTestCase {
         clusterService.close();
         transportService.close();
         super.tearDown();
-        assertThat(shardStateAction.remoteShardFailedCacheSize(), equalTo(0));
+        assertThat(shardStateAction.remoteShardRequestsInFlight(), equalTo(0));
     }
 
     @AfterClass
@@ -382,6 +382,40 @@ public class ShardStateActionTests extends ESTestCase {
         assertThat(transport.capturedRequests(), arrayWithSize(0));
     }
 
+    public void testDeduplicateRemoteShardStarted() throws InterruptedException {
+        final String index = "test";
+        setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
+        ShardRouting startedShard = getRandomShardRouting(index);
+        int numListeners = between(1, 100);
+        CountDownLatch latch = new CountDownLatch(numListeners);
+        long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
+        int expectedRequests = 1;
+        for (int i = 0; i < numListeners; i++) {
+            if (rarely() && i > 0) {
+                expectedRequests++;
+                shardStateAction.clearRemoteShardRequestDeduplicator();
+            }
+            shardStateAction.shardStarted(startedShard, primaryTerm, "started", ShardLongFieldRange.EMPTY, new ActionListener<>() {
+                @Override
+                public void onResponse(Void aVoid) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    latch.countDown();
+                }
+            });
+        }
+        CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
+        assertThat(capturedRequests, arrayWithSize(expectedRequests));
+        for (int i = 0; i < expectedRequests; i++) {
+            transport.handleResponse(capturedRequests[i].requestId, TransportResponse.Empty.INSTANCE);
+        }
+        latch.await();
+        assertThat(transport.capturedRequests(), arrayWithSize(0));
+    }
+
     public void testRemoteShardFailedConcurrently() throws Exception {
         final String index = "test";
         final AtomicBoolean shutdown = new AtomicBoolean(false);