Răsfoiți Sursa

Speed up snapshot shards service by avoiding looping all shards needlessly (#90057)

Store a flag to indicate whether or not we have INIT stage shards in
a snapshot to avoid looping over those snapshots that don't needlessly
in the snapshot shards service.
We loop all shards when constructing the snapshot entry anyway so this
is almost free when constructing the entry, but saves considerable CPU
time when there's many queued up snapshots during CS application.
Armin Braun 3 ani în urmă
părinte
comite
3446853aa3

+ 43 - 8
server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

@@ -686,6 +686,13 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
         @Nullable
         private final String failure;
 
+        /**
+         * Flag set to true in case any of the shard snapshots in {@link #shards} are in state {@link ShardState#INIT}.
+         * This is used by data nodes to determine if there is any work to be done on a snapshot by them without having to iterate
+         * the full {@link #shards} map.
+         */
+        private final boolean hasShardsInInitState;
+
         // visible for testing, use #startedEntry and copy constructors in production code
         public static Entry snapshot(
             Snapshot snapshot,
@@ -704,13 +711,16 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
         ) {
             final Map<String, Index> res = Maps.newMapWithExpectedSize(indices.size());
             final Map<RepositoryShardId, ShardSnapshotStatus> byRepoShardIdBuilder = Maps.newHashMapWithExpectedSize(shards.size());
+            boolean hasInitStateShards = false;
             for (Map.Entry<ShardId, ShardSnapshotStatus> entry : shards.entrySet()) {
                 final ShardId shardId = entry.getKey();
                 final IndexId indexId = indices.get(shardId.getIndexName());
                 final Index index = shardId.getIndex();
                 final Index existing = res.put(indexId.getName(), index);
                 assert existing == null || existing.equals(index) : "Conflicting indices [" + existing + "] and [" + index + "]";
-                byRepoShardIdBuilder.put(new RepositoryShardId(indexId, shardId.id()), entry.getValue());
+                final var shardSnapshotStatus = entry.getValue();
+                hasInitStateShards |= shardSnapshotStatus.state() == ShardState.INIT;
+                byRepoShardIdBuilder.put(new RepositoryShardId(indexId, shardId.id()), shardSnapshotStatus);
             }
             return new Entry(
                 snapshot,
@@ -728,7 +738,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
                 version,
                 null,
                 byRepoShardIdBuilder,
-                res
+                res,
+                hasInitStateShards
             );
         }
 
@@ -759,7 +770,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
                 version,
                 source,
                 shardStatusByRepoShardId,
-                Map.of()
+                Map.of(),
+                false
             );
         }
 
@@ -779,7 +791,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             Version version,
             @Nullable SnapshotId source,
             Map<RepositoryShardId, ShardSnapshotStatus> shardStatusByRepoShardId,
-            Map<String, Index> snapshotIndices
+            Map<String, Index> snapshotIndices,
+            boolean hasShardsInInitState
         ) {
             this.state = state;
             this.snapshot = snapshot;
@@ -797,7 +810,15 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             this.source = source;
             this.shardStatusByRepoShardId = Map.copyOf(shardStatusByRepoShardId);
             this.snapshotIndices = snapshotIndices;
-            assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.shardStatusByRepoShardId);
+            this.hasShardsInInitState = hasShardsInInitState;
+            assert assertShardsConsistent(
+                this.source,
+                this.state,
+                this.indices,
+                this.shards,
+                this.shardStatusByRepoShardId,
+                this.hasShardsInInitState
+            );
         }
 
         private static Entry readFrom(StreamInput in) throws IOException {
@@ -856,7 +877,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             State state,
             Map<String, IndexId> indices,
             Map<ShardId, ShardSnapshotStatus> shards,
-            Map<RepositoryShardId, ShardSnapshotStatus> statusByRepoShardId
+            Map<RepositoryShardId, ShardSnapshotStatus> statusByRepoShardId,
+            boolean hasInitStateShards
         ) {
             if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) {
                 return true;
@@ -883,12 +905,15 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             }
             if (source == null) {
                 assert shards.size() == statusByRepoShardId.size();
+                boolean foundInitStateShard = false;
                 for (Map.Entry<ShardId, ShardSnapshotStatus> entry : shards.entrySet()) {
+                    foundInitStateShard |= entry.getValue().state() == ShardState.INIT;
                     final ShardId routingShardId = entry.getKey();
                     assert statusByRepoShardId.get(
                         new RepositoryShardId(indices.get(routingShardId.getIndexName()), routingShardId.id())
                     ) == entry.getValue() : "found inconsistent values tracked by routing- and repository shard id";
                 }
+                assert foundInitStateShard == hasInitStateShards : "init shard state flag does not match shard states";
             }
             return true;
         }
@@ -912,7 +937,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
                 version,
                 source,
                 shardStatusByRepoShardId,
-                snapshotIndices
+                snapshotIndices,
+                hasShardsInInitState
             );
         }
 
@@ -1130,6 +1156,14 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
             return userMetadata;
         }
 
+        /**
+         * See {@link #hasShardsInInitState}.
+         * @return true if this entry can contain shard snapshots that have yet to be started on a data node.
+         */
+        public boolean hasShardsInInitState() {
+            return hasShardsInInitState;
+        }
+
         public boolean partial() {
             return partial;
         }
@@ -1514,7 +1548,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
                     part.version,
                     null,
                     part.shardStatusByRepoShardId,
-                    part.snapshotIndices
+                    part.snapshotIndices,
+                    part.hasShardsInInitState
                 );
             }
             if (part.isClone()) {

+ 1 - 1
server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

@@ -206,7 +206,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
                 // This is a snapshot clone, it will be executed on the current master
                 continue;
             }
-            if (entryState == State.STARTED) {
+            if (entryState == State.STARTED && entry.hasShardsInInitState()) {
                 Map<ShardId, IndexShardSnapshotStatus> startedShards = null;
                 final Snapshot snapshot = entry.snapshot();
                 Map<ShardId, IndexShardSnapshotStatus> snapshotShards = shardSnapshots.getOrDefault(snapshot, emptyMap());