|
@@ -163,7 +163,7 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
|
|
|
clusterService.addStateApplier(this);
|
|
|
this.clusterSettings = clusterSettings;
|
|
|
- this.cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(logger);
|
|
|
+ this.cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -221,7 +221,7 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
// Now we can start the actual restore process by adding shards to be recovered in the cluster state
|
|
|
// and updating cluster metadata (global and index) as needed
|
|
|
clusterService.submitStateUpdateTask("restore_snapshot[" + snapshotName + ']', new ClusterStateUpdateTask() {
|
|
|
- String restoreUUID = UUIDs.randomBase64UUID();
|
|
|
+ final String restoreUUID = UUIDs.randomBase64UUID();
|
|
|
RestoreInfo restoreInfo = null;
|
|
|
|
|
|
@Override
|
|
@@ -354,7 +354,7 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
shards = shardsBuilder.build();
|
|
|
RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(
|
|
|
restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards),
|
|
|
- Collections.unmodifiableList(new ArrayList<>(indices.keySet())),
|
|
|
+ List.copyOf(indices.keySet()),
|
|
|
shards
|
|
|
);
|
|
|
RestoreInProgress.Builder restoreInProgressBuilder;
|
|
@@ -397,7 +397,7 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
if (completed(shards)) {
|
|
|
// We don't have any indices to restore - we are done
|
|
|
restoreInfo = new RestoreInfo(snapshotId.getName(),
|
|
|
- Collections.unmodifiableList(new ArrayList<>(indices.keySet())),
|
|
|
+ List.copyOf(indices.keySet()),
|
|
|
shards.size(),
|
|
|
shards.size() - failedShards(shards));
|
|
|
}
|
|
@@ -595,7 +595,8 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
}
|
|
|
|
|
|
public static class RestoreInProgressUpdater extends RoutingChangesObserver.AbstractRoutingChangesObserver {
|
|
|
- private final Map<String, Updates> shardChanges = new HashMap<>();
|
|
|
+ // Map of RestoreUUID to a of changes to the shards' restore statuses
|
|
|
+ private final Map<String, Map<ShardId, ShardRestoreStatus>> shardChanges = new HashMap<>();
|
|
|
|
|
|
@Override
|
|
|
public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
|
|
@@ -603,7 +604,7 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
if (initializingShard.primary()) {
|
|
|
RecoverySource recoverySource = initializingShard.recoverySource();
|
|
|
if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
|
|
|
- changes(recoverySource).shards.put(
|
|
|
+ changes(recoverySource).put(
|
|
|
initializingShard.shardId(),
|
|
|
new ShardRestoreStatus(initializingShard.currentNodeId(), RestoreInProgress.State.SUCCESS));
|
|
|
}
|
|
@@ -619,7 +620,7 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
// to restore this shard on another node if the snapshot files are corrupt. In case where a node just left or crashed,
|
|
|
// however, we only want to acknowledge the restore operation once it has been successfully restored on another node.
|
|
|
if (unassignedInfo.getFailure() != null && Lucene.isCorruptionException(unassignedInfo.getFailure().getCause())) {
|
|
|
- changes(recoverySource).shards.put(
|
|
|
+ changes(recoverySource).put(
|
|
|
failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(),
|
|
|
RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage()));
|
|
|
}
|
|
@@ -632,7 +633,7 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
// if we force an empty primary, we should also fail the restore entry
|
|
|
if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT &&
|
|
|
initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) {
|
|
|
- changes(unassignedShard.recoverySource()).shards.put(
|
|
|
+ changes(unassignedShard.recoverySource()).put(
|
|
|
unassignedShard.shardId(),
|
|
|
new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE,
|
|
|
"recovery source type changed from snapshot to " + initializedShard.recoverySource())
|
|
@@ -646,7 +647,7 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
|
|
|
if (newUnassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO) {
|
|
|
String reason = "shard could not be allocated to any of the nodes";
|
|
|
- changes(recoverySource).shards.put(
|
|
|
+ changes(recoverySource).put(
|
|
|
unassignedShard.shardId(),
|
|
|
new ShardRestoreStatus(unassignedShard.currentNodeId(), RestoreInProgress.State.FAILURE, reason));
|
|
|
}
|
|
@@ -657,24 +658,20 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
* Helper method that creates update entry for the given recovery source's restore uuid
|
|
|
* if such an entry does not exist yet.
|
|
|
*/
|
|
|
- private Updates changes(RecoverySource recoverySource) {
|
|
|
+ private Map<ShardId, ShardRestoreStatus> changes(RecoverySource recoverySource) {
|
|
|
assert recoverySource.getType() == RecoverySource.Type.SNAPSHOT;
|
|
|
- return shardChanges.computeIfAbsent(((SnapshotRecoverySource) recoverySource).restoreUUID(), k -> new Updates());
|
|
|
- }
|
|
|
-
|
|
|
- private static class Updates {
|
|
|
- private Map<ShardId, ShardRestoreStatus> shards = new HashMap<>();
|
|
|
+ return shardChanges.computeIfAbsent(((SnapshotRecoverySource) recoverySource).restoreUUID(), k -> new HashMap<>());
|
|
|
}
|
|
|
|
|
|
public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) {
|
|
|
if (shardChanges.isEmpty() == false) {
|
|
|
RestoreInProgress.Builder builder = new RestoreInProgress.Builder();
|
|
|
for (RestoreInProgress.Entry entry : oldRestore) {
|
|
|
- Updates updates = shardChanges.get(entry.uuid());
|
|
|
+ Map<ShardId, ShardRestoreStatus> updates = shardChanges.get(entry.uuid());
|
|
|
ImmutableOpenMap<ShardId, ShardRestoreStatus> shardStates = entry.shards();
|
|
|
- if (updates != null && updates.shards.isEmpty() == false) {
|
|
|
+ if (updates != null && updates.isEmpty() == false) {
|
|
|
ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder(shardStates);
|
|
|
- for (Map.Entry<ShardId, ShardRestoreStatus> shard : updates.shards.entrySet()) {
|
|
|
+ for (Map.Entry<ShardId, ShardRestoreStatus> shard : updates.entrySet()) {
|
|
|
ShardId shardId = shard.getKey();
|
|
|
ShardRestoreStatus status = shardStates.get(shardId);
|
|
|
if (status == null || status.state().completed() == false) {
|
|
@@ -720,14 +717,8 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private final Logger logger;
|
|
|
-
|
|
|
- CleanRestoreStateTaskExecutor(Logger logger) {
|
|
|
- this.logger = logger;
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
- public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
|
|
|
+ public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) {
|
|
|
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
|
|
|
Set<String> completedRestores = tasks.stream().map(e -> e.uuid).collect(Collectors.toSet());
|
|
|
RestoreInProgress.Builder restoreInProgressBuilder = new RestoreInProgress.Builder();
|
|
@@ -782,8 +773,8 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public static RestoreInProgress.State overallState(RestoreInProgress.State nonCompletedState,
|
|
|
- ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
|
|
|
+ private static RestoreInProgress.State overallState(RestoreInProgress.State nonCompletedState,
|
|
|
+ ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
|
|
|
boolean hasFailed = false;
|
|
|
for (ObjectCursor<RestoreInProgress.ShardRestoreStatus> status : shards.values()) {
|
|
|
if (!status.value.state().completed()) {
|
|
@@ -819,7 +810,7 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
return failedShards;
|
|
|
}
|
|
|
|
|
|
- private Map<String, String> renamedIndices(RestoreSnapshotRequest request, List<String> filteredIndices) {
|
|
|
+ private static Map<String, String> renamedIndices(RestoreSnapshotRequest request, List<String> filteredIndices) {
|
|
|
Map<String, String> renamedIndices = new HashMap<>();
|
|
|
for (String index : filteredIndices) {
|
|
|
String renamedIndex = index;
|
|
@@ -841,7 +832,7 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
* @param repository repository name
|
|
|
* @param snapshotInfo snapshot metadata
|
|
|
*/
|
|
|
- private void validateSnapshotRestorable(final String repository, final SnapshotInfo snapshotInfo) {
|
|
|
+ private static void validateSnapshotRestorable(final String repository, final SnapshotInfo snapshotInfo) {
|
|
|
if (!snapshotInfo.state().restorable()) {
|
|
|
throw new SnapshotRestoreException(new Snapshot(repository, snapshotInfo.snapshotId()),
|
|
|
"unsupported snapshot state [" + snapshotInfo.state() + "]");
|
|
@@ -853,7 +844,7 @@ public class RestoreService implements ClusterStateApplier {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private boolean failed(SnapshotInfo snapshot, String index) {
|
|
|
+ private static boolean failed(SnapshotInfo snapshot, String index) {
|
|
|
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
|
|
|
if (index.equals(failure.index())) {
|
|
|
return true;
|