|
@@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.FailedNodeException;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
+import org.elasticsearch.cluster.RestoreInProgress;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
|
@@ -136,52 +137,61 @@ public class SearchableSnapshotAllocator implements ExistingShardsAllocator {
|
|
|
UnassignedAllocationHandler unassignedAllocationHandler
|
|
|
) {
|
|
|
// TODO: cancel and jump to better available allocations?
|
|
|
- if (shardRouting.primary()
|
|
|
- && (shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE
|
|
|
- || shardRouting.recoverySource().getType() == RecoverySource.Type.EMPTY_STORE)) {
|
|
|
- // we always force snapshot recovery source to use the snapshot-based recovery process on the node
|
|
|
- final Settings indexSettings = allocation.metadata().index(shardRouting.index()).getSettings();
|
|
|
- final IndexId indexId = new IndexId(
|
|
|
- SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings),
|
|
|
- SNAPSHOT_INDEX_ID_SETTING.get(indexSettings)
|
|
|
- );
|
|
|
- final SnapshotId snapshotId = new SnapshotId(
|
|
|
- SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings),
|
|
|
- SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings)
|
|
|
- );
|
|
|
+ if (shardRouting.primary()) {
|
|
|
+ final String recoveryUuid = getRecoverySourceRestoreUuid(shardRouting, allocation);
|
|
|
+ if (recoveryUuid != null) {
|
|
|
+
|
|
|
+ // we always force snapshot recovery source to use the snapshot-based recovery process on the node
|
|
|
+ final Settings indexSettings = allocation.metadata().index(shardRouting.index()).getSettings();
|
|
|
+ final IndexId indexId = new IndexId(
|
|
|
+ SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings),
|
|
|
+ SNAPSHOT_INDEX_ID_SETTING.get(indexSettings)
|
|
|
+ );
|
|
|
+ final SnapshotId snapshotId = new SnapshotId(
|
|
|
+ SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings),
|
|
|
+ SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings)
|
|
|
+ );
|
|
|
|
|
|
- final String repositoryUuid = SNAPSHOT_REPOSITORY_UUID_SETTING.get(indexSettings);
|
|
|
- final String repositoryName;
|
|
|
- if (Strings.hasLength(repositoryUuid) == false) {
|
|
|
- repositoryName = SNAPSHOT_REPOSITORY_NAME_SETTING.get(indexSettings);
|
|
|
- } else {
|
|
|
- final RepositoriesMetadata repoMetadata = allocation.metadata().custom(RepositoriesMetadata.TYPE);
|
|
|
- final List<RepositoryMetadata> repositories = repoMetadata == null ? emptyList() : repoMetadata.repositories();
|
|
|
- repositoryName = repositories.stream()
|
|
|
- .filter(r -> repositoryUuid.equals(r.uuid()))
|
|
|
- .map(RepositoryMetadata::name)
|
|
|
- .findFirst()
|
|
|
- .orElse(null);
|
|
|
- }
|
|
|
+ final String repositoryUuid = SNAPSHOT_REPOSITORY_UUID_SETTING.get(indexSettings);
|
|
|
+ final String repositoryName;
|
|
|
+ if (Strings.hasLength(repositoryUuid) == false) {
|
|
|
+ repositoryName = SNAPSHOT_REPOSITORY_NAME_SETTING.get(indexSettings);
|
|
|
+ } else {
|
|
|
+ final RepositoriesMetadata repoMetadata = allocation.metadata().custom(RepositoriesMetadata.TYPE);
|
|
|
+ final List<RepositoryMetadata> repositories = repoMetadata == null ? emptyList() : repoMetadata.repositories();
|
|
|
+ repositoryName = repositories.stream()
|
|
|
+ .filter(r -> repositoryUuid.equals(r.uuid()))
|
|
|
+ .map(RepositoryMetadata::name)
|
|
|
+ .findFirst()
|
|
|
+ .orElse(null);
|
|
|
+ }
|
|
|
|
|
|
- if (repositoryName == null) {
|
|
|
- // TODO if the repository we seek appears later, we will need to get its UUID (usually automatic) and then reroute
|
|
|
- unassignedAllocationHandler.removeAndIgnore(UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes());
|
|
|
- return;
|
|
|
- }
|
|
|
+ if (repositoryName == null) {
|
|
|
+ unassignedAllocationHandler.removeAndIgnore(UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes());
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
|
|
|
+ final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
|
|
|
|
|
|
- shardRouting = unassignedAllocationHandler.updateUnassigned(
|
|
|
- shardRouting.unassignedInfo(),
|
|
|
- new RecoverySource.SnapshotRecoverySource(
|
|
|
- RecoverySource.SnapshotRecoverySource.NO_API_RESTORE_UUID,
|
|
|
+ final Version version = shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT
|
|
|
+ ? ((RecoverySource.SnapshotRecoverySource) shardRouting.recoverySource()).version()
|
|
|
+ : Version.CURRENT;
|
|
|
+
|
|
|
+ final RecoverySource.SnapshotRecoverySource recoverySource = new RecoverySource.SnapshotRecoverySource(
|
|
|
+ recoveryUuid,
|
|
|
snapshot,
|
|
|
- Version.CURRENT,
|
|
|
+ version,
|
|
|
indexId
|
|
|
- ),
|
|
|
- allocation.changes()
|
|
|
- );
|
|
|
+ );
|
|
|
+
|
|
|
+ if (shardRouting.recoverySource().equals(recoverySource) == false) {
|
|
|
+ shardRouting = unassignedAllocationHandler.updateUnassigned(
|
|
|
+ shardRouting.unassignedInfo(),
|
|
|
+ recoverySource,
|
|
|
+ allocation.changes()
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
final AllocateUnassignedDecision allocateUnassignedDecision = decideAllocation(allocation, shardRouting);
|
|
@@ -207,6 +217,60 @@ public class SearchableSnapshotAllocator implements ExistingShardsAllocator {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * @return the restore UUID to use when adjusting the recovery source of the shard to be a snapshot recovery source from a
|
|
|
+ * repository of the correct name. Returns {@code null} if the recovery source should not be adjusted.
|
|
|
+ */
|
|
|
+ @Nullable
|
|
|
+ private static String getRecoverySourceRestoreUuid(ShardRouting shardRouting, RoutingAllocation allocation) {
|
|
|
+ switch (shardRouting.recoverySource().getType()) {
|
|
|
+ case EXISTING_STORE:
|
|
|
+ case EMPTY_STORE:
|
|
|
+ // this shard previously failed and/or was force-allocated, so the recovery source must be changed to reflect that it will
|
|
|
+ // be recovered from the snapshot again
|
|
|
+ return RecoverySource.SnapshotRecoverySource.NO_API_RESTORE_UUID;
|
|
|
+ case SNAPSHOT:
|
|
|
+ // the recovery source may be correct, or we may be recovering it from a real snapshot (i.e. a backup)
|
|
|
+
|
|
|
+ final RecoverySource.SnapshotRecoverySource recoverySource = (RecoverySource.SnapshotRecoverySource) shardRouting
|
|
|
+ .recoverySource();
|
|
|
+ if (recoverySource.restoreUUID().equals(RecoverySource.SnapshotRecoverySource.NO_API_RESTORE_UUID)) {
|
|
|
+ // this shard already has the right recovery ID, but maybe the repository name is now different, so check if it needs
|
|
|
+ // fixing up again
|
|
|
+ return RecoverySource.SnapshotRecoverySource.NO_API_RESTORE_UUID;
|
|
|
+ }
|
|
|
+
|
|
|
+ // else we're recovering from a real snapshot, in which case we can only fix up the recovery source once the "real"
|
|
|
+ // recovery attempt has completed. It might succeed, but if it doesn't then we replace it with a dummy restore to bypass
|
|
|
+ // the RestoreInProgressAllocationDecider
|
|
|
+
|
|
|
+ final RestoreInProgress restoreInProgress = allocation.custom(RestoreInProgress.TYPE);
|
|
|
+ if (restoreInProgress == null) {
|
|
|
+ // no ongoing restores, so this shard definitely completed
|
|
|
+ return RecoverySource.SnapshotRecoverySource.NO_API_RESTORE_UUID;
|
|
|
+ }
|
|
|
+
|
|
|
+ final RestoreInProgress.Entry entry = restoreInProgress.get(recoverySource.restoreUUID());
|
|
|
+ if (entry == null) {
|
|
|
+ // this specific restore is not ongoing, so this shard definitely completed
|
|
|
+ return RecoverySource.SnapshotRecoverySource.NO_API_RESTORE_UUID;
|
|
|
+ }
|
|
|
+
|
|
|
+ // else this specific restore is still ongoing, so check whether this shard has completed its attempt yet
|
|
|
+ final RestoreInProgress.ShardRestoreStatus shardRestoreStatus = entry.shards().get(shardRouting.shardId());
|
|
|
+ if (shardRestoreStatus == null || shardRestoreStatus.state().completed()) {
|
|
|
+ // this shard is not still pending in its specific restore so we can fix up its restore UUID
|
|
|
+ return RecoverySource.SnapshotRecoverySource.NO_API_RESTORE_UUID;
|
|
|
+ } else {
|
|
|
+ // this shard is still pending in its specific restore so we must preserve its restore UUID, but we can fix up
|
|
|
+ // the repository name anyway
|
|
|
+ return recoverySource.restoreUUID();
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private AllocateUnassignedDecision decideAllocation(RoutingAllocation allocation, ShardRouting shardRouting) {
|
|
|
assert shardRouting.unassigned();
|
|
|
assert ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.get(
|