Browse Source

Prepare Snapshot Shard State Update Logic For Clone Logic (#62617)

Small refactoring to shorten the diff with the clone logic in #61839:

* Since clones will create a different kind of shard state update that
isn't the same request sent by the snapshot shards service (and cannot be
the same request because we have no `ShardId`) base the shard state updates
on a different class that can be extended to be general enough to accomodate
shard clones as well.
* Make the update executor a singleton (can't make it an inline lambda as that
would break CS update batching because the executor is used as a map key but
this change still makes it crystal clear that there's no internal state to the
executor)
* Make shard state update responses a singleton (can't use TransportResponse.Empty because
we need an action response but still it makes it clear that there's no actual
response with content here)
Armin Braun 5 years ago
parent
commit
6ab28e5b22

+ 2 - 2
server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

@@ -450,8 +450,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
                     SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, req,
                 new TransportResponseHandler<UpdateIndexShardSnapshotStatusResponse>() {
                     @Override
-                    public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
-                        return new UpdateIndexShardSnapshotStatusResponse(in);
+                    public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) {
+                        return UpdateIndexShardSnapshotStatusResponse.INSTANCE;
                     }
 
                     @Override

+ 111 - 82
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -152,7 +152,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
     // Set of snapshots that are currently being ended by this node
     private final Set<Snapshot> endingSnapshots = Collections.synchronizedSet(new HashSet<>());
 
-    private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor();
     private final UpdateSnapshotStatusAction updateSnapshotStatusHandler;
 
     private final TransportService transportService;
@@ -1909,101 +1908,130 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         return true;
     }
 
-    private static class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> {
-
-        @Override
-        public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest>
-                        execute(ClusterState currentState, List<UpdateIndexShardSnapshotStatusRequest> tasks) {
-            int changedCount = 0;
-            int startedCount = 0;
-            final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
-            // Tasks to check for updates for running snapshots.
-            final List<UpdateIndexShardSnapshotStatusRequest> unconsumedTasks = new ArrayList<>(tasks);
-            // Tasks that were used to complete an existing in-progress shard snapshot
-            final Set<UpdateIndexShardSnapshotStatusRequest> executedTasks = new HashSet<>();
-            for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
-                if (entry.state().completed()) {
-                    entries.add(entry);
+    private static final ClusterStateTaskExecutor<ShardSnapshotUpdate> SHARD_STATE_EXECUTOR = (currentState, tasks) -> {
+        int changedCount = 0;
+        int startedCount = 0;
+        final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
+        // Tasks to check for updates for running snapshots.
+        final List<ShardSnapshotUpdate> unconsumedTasks = new ArrayList<>(tasks);
+        // Tasks that were used to complete an existing in-progress shard snapshot
+        final Set<ShardSnapshotUpdate> executedTasks = new HashSet<>();
+        for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
+            if (entry.state().completed()) {
+                entries.add(entry);
+                continue;
+            }
+            ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = null;
+            for (Iterator<ShardSnapshotUpdate> iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
+                final ShardSnapshotUpdate updateSnapshotState = iterator.next();
+                final Snapshot updatedSnapshot = updateSnapshotState.snapshot;
+                final String updatedRepository = updatedSnapshot.getRepository();
+                if (entry.repository().equals(updatedRepository) == false) {
                     continue;
                 }
-                ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = null;
-                for (Iterator<UpdateIndexShardSnapshotStatusRequest> iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
-                    final UpdateIndexShardSnapshotStatusRequest updateSnapshotState = iterator.next();
-                    final Snapshot updatedSnapshot = updateSnapshotState.snapshot();
-                    final String updatedRepository = updatedSnapshot.getRepository();
-                    if (entry.repository().equals(updatedRepository) == false) {
+                final ShardId finishedShardId = updateSnapshotState.shardId;
+                if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
+                    final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
+                    if (existing == null) {
+                        logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
+                                updateSnapshotState, entry);
+                        assert false : "This should never happen, data nodes should only send updates for expected shards";
                         continue;
                     }
-                    final ShardId finishedShardId = updateSnapshotState.shardId();
-                    if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
-                        final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
-                        if (existing == null) {
-                            logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
-                                    updateSnapshotState, entry);
-                            assert false : "This should never happen, data nodes should only send updates for expected shards";
-                            continue;
-                        }
-                        if (existing.state().completed()) {
-                            // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
-                            iterator.remove();
-                            continue;
-                        }
-                        logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
-                                finishedShardId, updateSnapshotState.status().state());
-                        if (shards == null) {
-                            shards = ImmutableOpenMap.builder(entry.shards());
-                        }
-                        shards.put(finishedShardId, updateSnapshotState.status());
-                        executedTasks.add(updateSnapshotState);
-                        changedCount++;
-                    } else if (executedTasks.contains(updateSnapshotState)) {
-                        // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
-                        final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
-                        if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
-                            continue;
-                        }
-                        if (shards == null) {
-                            shards = ImmutableOpenMap.builder(entry.shards());
-                        }
-                        final ShardSnapshotStatus finishedStatus = updateSnapshotState.status();
-                        logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
-                                finishedStatus.nodeId(), finishedStatus.generation());
-                        shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
+                    if (existing.state().completed()) {
+                        // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
                         iterator.remove();
-                        startedCount++;
+                        continue;
+                    }
+                    logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
+                            finishedShardId, updateSnapshotState.updatedState.state());
+                    if (shards == null) {
+                        shards = ImmutableOpenMap.builder(entry.shards());
+                    }
+                    shards.put(finishedShardId, updateSnapshotState.updatedState);
+                    executedTasks.add(updateSnapshotState);
+                    changedCount++;
+                } else if (executedTasks.contains(updateSnapshotState)) {
+                    // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
+                    final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
+                    if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+                        continue;
+                    }
+                    if (shards == null) {
+                        shards = ImmutableOpenMap.builder(entry.shards());
                     }
+                    final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+                    logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
+                            finishedStatus.nodeId(), finishedStatus.generation());
+                    shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
+                    iterator.remove();
+                    startedCount++;
                 }
+            }
 
-                if (shards == null) {
-                    entries.add(entry);
-                } else {
-                    entries.add(entry.withShardStates(shards.build()));
-                }
+            if (shards == null) {
+                entries.add(entry);
+            } else {
+                entries.add(entry.withShardStates(shards.build()));
             }
-            if (changedCount > 0) {
-                logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
-                        "[{}] shard snapshots", changedCount, startedCount);
-                return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks)
-                        .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
-                                SnapshotsInProgress.of(entries)).build());
+        }
+        if (changedCount > 0) {
+            logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
+                    "[{}] shard snapshots", changedCount, startedCount);
+            return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(
+                    ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build());
+        }
+        return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(currentState);
+    };
+
+    /**
+     * An update to the snapshot state of a shard.
+     */
+    private static final class ShardSnapshotUpdate {
+
+        private final Snapshot snapshot;
+
+        private final ShardId shardId;
+
+        private final ShardSnapshotStatus updatedState;
+
+        private ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) {
+            this.snapshot = snapshot;
+            this.shardId = shardId;
+            this.updatedState = updatedState;
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (this == other) {
+                return true;
             }
-            return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks).build(currentState);
+            if ((other instanceof ShardSnapshotUpdate) == false) {
+                return false;
+            }
+            final ShardSnapshotUpdate that = (ShardSnapshotUpdate) other;
+            return this.snapshot.equals(that.snapshot) && this.shardId.equals(that.shardId) && this.updatedState == that.updatedState;
+        }
+
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(snapshot, shardId, updatedState);
         }
     }
 
     /**
-     * Updates the shard status on master node
+     * Updates the shard status in the cluster state
      *
-     * @param request update shard status request
+     * @param update shard snapshot status update
      */
-    private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request,
-                                          ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
-        logger.trace("received updated snapshot restore state [{}]", request);
+    private void innerUpdateSnapshotState(ShardSnapshotUpdate update, ActionListener<Void> listener) {
+        logger.trace("received updated snapshot restore state [{}]", update);
         clusterService.submitStateUpdateTask(
                 "update snapshot state",
-                request,
+                update,
                 ClusterStateTaskConfig.build(Priority.NORMAL),
-                snapshotStateExecutor,
+                SHARD_STATE_EXECUTOR,
                 new ClusterStateTaskListener() {
                     @Override
                     public void onFailure(String source, Exception e) {
@@ -2013,13 +2041,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     @Override
                     public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                         try {
-                            listener.onResponse(new UpdateIndexShardSnapshotStatusResponse());
+                            listener.onResponse(null);
                         } finally {
                             // Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
                             // state update we check if its state is completed and end it if it is.
-                            if (endingSnapshots.contains(request.snapshot()) == false) {
+                            if (endingSnapshots.contains(update.snapshot) == false) {
                                 final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
-                                final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(request.snapshot());
+                                final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(update.snapshot);
                                 // If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
                                 if (updatedEntry != null && updatedEntry.state().completed()) {
                                     endSnapshot(updatedEntry, newState.metadata(), null);
@@ -2047,13 +2075,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
 
         @Override
         protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
-            return new UpdateIndexShardSnapshotStatusResponse(in);
+            return UpdateIndexShardSnapshotStatusResponse.INSTANCE;
         }
 
         @Override
         protected void masterOperation(Task task, UpdateIndexShardSnapshotStatusRequest request, ClusterState state,
                                        ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
-            innerUpdateSnapshotState(request, listener);
+            innerUpdateSnapshotState(new ShardSnapshotUpdate(request.snapshot(), request.shardId(), request.status()),
+                    ActionListener.delegateFailure(listener, (l, v) -> l.onResponse(UpdateIndexShardSnapshotStatusResponse.INSTANCE)));
         }
 
         @Override

+ 2 - 5
server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusResponse.java

@@ -19,18 +19,15 @@
 package org.elasticsearch.snapshots;
 
 import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 
 import java.io.IOException;
 
 class UpdateIndexShardSnapshotStatusResponse extends ActionResponse {
 
-    UpdateIndexShardSnapshotStatusResponse() {}
+    public static final UpdateIndexShardSnapshotStatusResponse INSTANCE = new UpdateIndexShardSnapshotStatusResponse();
 
-    UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException {
-        super(in);
-    }
+    private UpdateIndexShardSnapshotStatusResponse() {}
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {}