|
@@ -313,11 +313,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
|
|
|
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
|
|
|
|
|
|
- final List<IndexId> indexIds = repositoryData.resolveNewIndices(
|
|
|
- indices, getInFlightIndexIds(runningSnapshots, repositoryName));
|
|
|
+ final Map<String, IndexId> indexIds =
|
|
|
+ repositoryData.resolveNewIndices(indices, getInFlightIndexIds(runningSnapshots, repositoryName));
|
|
|
final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null);
|
|
|
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(snapshots, deletionsInProgress, currentState.metadata(),
|
|
|
- currentState.routingTable(), indexIds, useShardGenerations(version), repositoryData, repositoryName);
|
|
|
+ currentState.routingTable(), indexIds.values(), useShardGenerations(version), repositoryData, repositoryName);
|
|
|
if (request.partial() == false) {
|
|
|
Set<String> missing = new HashSet<>();
|
|
|
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> entry : shards) {
|
|
@@ -369,9 +369,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
}
|
|
|
|
|
|
private static Map<String, IndexId> getInFlightIndexIds(List<SnapshotsInProgress.Entry> runningSnapshots, String repositoryName) {
|
|
|
- return runningSnapshots.stream().filter(entry -> entry.repository().equals(repositoryName))
|
|
|
- .flatMap(entry -> entry.indices().stream()).distinct()
|
|
|
- .collect(Collectors.toMap(IndexId::getName, Function.identity()));
|
|
|
+ final Map<String, IndexId> allIndices = new HashMap<>();
|
|
|
+ for (SnapshotsInProgress.Entry runningSnapshot : runningSnapshots) {
|
|
|
+ if (runningSnapshot.repository().equals(repositoryName)) {
|
|
|
+ allIndices.putAll(runningSnapshot.indices());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return Collections.unmodifiableMap(allIndices);
|
|
|
}
|
|
|
|
|
|
// TODO: It is worth revisiting the design choice of creating a placeholder entry in snapshots-in-progress here once we have a cache
|
|
@@ -477,7 +481,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
* @param cloneEntry clone operation in the cluster state
|
|
|
*/
|
|
|
private void startCloning(Repository repository, SnapshotsInProgress.Entry cloneEntry) {
|
|
|
- final List<IndexId> indices = cloneEntry.indices();
|
|
|
+ final Collection<IndexId> indices = cloneEntry.indices().values();
|
|
|
final SnapshotId sourceSnapshot = cloneEntry.source();
|
|
|
final Snapshot targetSnapshot = cloneEntry.snapshot();
|
|
|
|
|
@@ -689,11 +693,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
|
|
|
private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
|
|
|
ShardGenerations.Builder builder = ShardGenerations.builder();
|
|
|
- final Map<String, IndexId> indexLookup = new HashMap<>();
|
|
|
- snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
|
|
|
if (snapshot.isClone()) {
|
|
|
snapshot.clones().forEach(c -> {
|
|
|
- final IndexId indexId = indexLookup.get(c.key.indexName());
|
|
|
+ final IndexId indexId = snapshot.indices().get(c.key.indexName());
|
|
|
builder.put(indexId, c.key.shardId(), c.value.generation());
|
|
|
});
|
|
|
} else {
|
|
@@ -703,7 +705,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
"Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
|
|
|
return;
|
|
|
}
|
|
|
- final IndexId indexId = indexLookup.get(c.key.getIndexName());
|
|
|
+ final IndexId indexId = snapshot.indices().get(c.key.getIndexName());
|
|
|
if (indexId != null) {
|
|
|
builder.put(indexId, c.key.id(), c.value.generation());
|
|
|
}
|
|
@@ -717,7 +719,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
if (snapshot.includeGlobalState() == false) {
|
|
|
// Remove global state from the cluster state
|
|
|
builder = Metadata.builder();
|
|
|
- for (IndexId index : snapshot.indices()) {
|
|
|
+ for (IndexId index : snapshot.indices().values()) {
|
|
|
final IndexMetadata indexMetadata = metadata.index(index.getName());
|
|
|
if (indexMetadata == null) {
|
|
|
assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial.";
|
|
@@ -731,7 +733,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
// Only keep those data streams in the metadata that were actually requested by the initial snapshot create operation and that have
|
|
|
// all their indices contained in the snapshot
|
|
|
final Map<String, DataStream> dataStreams = new HashMap<>();
|
|
|
- final Set<String> indicesInSnapshot = snapshot.indices().stream().map(IndexId::getName).collect(Collectors.toSet());
|
|
|
+ final Set<String> indicesInSnapshot = snapshot.indices().keySet();
|
|
|
for (String dataStreamName : snapshot.dataStreams()) {
|
|
|
DataStream dataStream = metadata.dataStreams().get(dataStreamName);
|
|
|
if (dataStream == null) {
|
|
@@ -1237,7 +1239,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
final Metadata existing = repo.getSnapshotGlobalMetadata(entry.source());
|
|
|
final Metadata.Builder metaBuilder = Metadata.builder(existing);
|
|
|
final Set<Index> existingIndices = new HashSet<>();
|
|
|
- for (IndexId index : entry.indices()) {
|
|
|
+ for (IndexId index : entry.indices().values()) {
|
|
|
final IndexMetadata indexMetadata = repo.getSnapshotIndexMetaData(repositoryData, entry.source(), index);
|
|
|
existingIndices.add(indexMetadata.getIndex());
|
|
|
metaBuilder.put(indexMetadata, false);
|
|
@@ -2189,8 +2191,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
} else {
|
|
|
if (shardAssignments == null) {
|
|
|
shardAssignments = shards(snapshotsInProgress,
|
|
|
- updatedDeletions, currentState.metadata(), currentState.routingTable(), entry.indices(),
|
|
|
- entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), repositoryData, repoName);
|
|
|
+ updatedDeletions, currentState.metadata(), currentState.routingTable(), entry.indices().values(),
|
|
|
+ entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), repositoryData, repoName);
|
|
|
}
|
|
|
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> updatedAssignmentsBuilder =
|
|
|
ImmutableOpenMap.builder(entry.shards());
|
|
@@ -2291,7 +2293,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
*/
|
|
|
private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(
|
|
|
SnapshotsInProgress snapshotsInProgress, SnapshotDeletionsInProgress deletionsInProgress,
|
|
|
- Metadata metadata, RoutingTable routingTable, List<IndexId> indices, boolean useShardGenerations,
|
|
|
+ Metadata metadata, RoutingTable routingTable, Collection<IndexId> indices, boolean useShardGenerations,
|
|
|
RepositoryData repositoryData, String repoName) {
|
|
|
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
|
|
|
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
|
|
@@ -2390,8 +2392,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
final Set<Index> indices = new HashSet<>();
|
|
|
for (final SnapshotsInProgress.Entry entry : snapshots.entries()) {
|
|
|
if (entry.partial() == false) {
|
|
|
- for (IndexId index : entry.indices()) {
|
|
|
- IndexMetadata indexMetadata = currentState.metadata().index(index.getName());
|
|
|
+ for (String indexName : entry.indices().keySet()) {
|
|
|
+ IndexMetadata indexMetadata = currentState.metadata().index(indexName);
|
|
|
if (indexMetadata != null && indicesToCheck.contains(indexMetadata.getIndex())) {
|
|
|
indices.add(indexMetadata.getIndex());
|
|
|
}
|
|
@@ -2541,9 +2543,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
// builder for updated shard clone status mappings if any could be computed
|
|
|
private ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clonesBuilder = null;
|
|
|
|
|
|
- // index lookup (index name -> IndexId) cache for #entry used to translate from ShardId to RepositoryShardId
|
|
|
- private Map<String, IndexId> indicesLookup = null;
|
|
|
-
|
|
|
EntryContext(SnapshotsInProgress.Entry entry, Iterator<ShardSnapshotUpdate> iterator) {
|
|
|
this.entry = entry;
|
|
|
this.iterator = iterator;
|
|
@@ -2690,13 +2689,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
|
|
|
private void tryStartCloneAfterSnapshotFinish(ShardId shardId, ShardSnapshotStatus updatedState) {
|
|
|
// shard snapshot was completed, we check if we can start a clone operation for the same repo shard
|
|
|
- if (indicesLookup == null) {
|
|
|
- // cache lookup of index name to IndexId for future ShardId -> RepositoryShardId translations
|
|
|
- indicesLookup = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
|
|
|
- }
|
|
|
- final IndexId indexId = indicesLookup.get(shardId.getIndexName());
|
|
|
+ final IndexId indexId = entry.indices().get(shardId.getIndexName());
|
|
|
// If the lookup finds the index id then at least the entry is concerned with the index id just updated
|
|
|
- // so we check on a shard level
|
|
|
if (indexId != null) {
|
|
|
final RepositoryShardId repoShardId = new RepositoryShardId(indexId, shardId.getId());
|
|
|
if (isQueued(entry.clones().get(repoShardId))) {
|