|
@@ -68,6 +68,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
|
|
import org.elasticsearch.common.compress.CompressorFactory;
|
|
|
import org.elasticsearch.common.compress.NotXContentException;
|
|
|
import org.elasticsearch.common.io.Streams;
|
|
|
+import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
|
|
import org.elasticsearch.common.lease.Releasable;
|
|
|
import org.elasticsearch.common.lucene.Lucene;
|
|
|
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
|
@@ -77,10 +78,12 @@ import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
+import org.elasticsearch.common.util.BigArrays;
|
|
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
|
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
|
|
+import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
|
|
import org.elasticsearch.common.xcontent.XContentParser;
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
@@ -261,6 +264,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
|
|
|
private final NamedXContentRegistry namedXContentRegistry;
|
|
|
|
|
|
+ private final BigArrays bigArrays;
|
|
|
+
|
|
|
/**
|
|
|
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
|
|
|
* {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart
|
|
@@ -300,11 +305,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
final RepositoryMetadata metadata,
|
|
|
final NamedXContentRegistry namedXContentRegistry,
|
|
|
final ClusterService clusterService,
|
|
|
+ final BigArrays bigArrays,
|
|
|
final RecoverySettings recoverySettings,
|
|
|
final BlobPath basePath) {
|
|
|
this.metadata = metadata;
|
|
|
this.threadPool = clusterService.getClusterApplierService().threadPool();
|
|
|
this.clusterService = clusterService;
|
|
|
+ this.bigArrays = bigArrays;
|
|
|
this.recoverySettings = recoverySettings;
|
|
|
this.compress = COMPRESS_SETTING.get(metadata.settings());
|
|
|
this.supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings());
|
|
@@ -453,9 +460,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
logger.trace("[{}] [{}] writing shard snapshot file for clone", shardId, target);
|
|
|
INDEX_SHARD_SNAPSHOT_FORMAT.write(sourceMeta.asClone(target.getName(), startTime,
|
|
|
threadPool.absoluteTimeInMillis() - startTime),
|
|
|
- shardContainer, target.getUUID(), compress);
|
|
|
+ shardContainer, target.getUUID(), compress, bigArrays);
|
|
|
INDEX_SHARD_SNAPSHOTS_FORMAT.write(existingSnapshots.withClone(source.getName(), target.getName()), shardContainer, newGen,
|
|
|
- compress);
|
|
|
+ compress, bigArrays);
|
|
|
return newGen;
|
|
|
}));
|
|
|
}
|
|
@@ -798,7 +805,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
for (String indexMetaGeneration : indexMetaGenerations) {
|
|
|
executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
|
|
|
try {
|
|
|
- return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry).getNumberOfShards();
|
|
|
+ return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry,
|
|
|
+ bigArrays).getNumberOfShards();
|
|
|
} catch (Exception ex) {
|
|
|
logger.warn(() -> new ParameterizedMessage(
|
|
|
"[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
|
|
@@ -1112,7 +1120,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
|
|
|
// Write Global MetaData
|
|
|
executor.execute(ActionRunnable.run(allMetaListener,
|
|
|
- () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)));
|
|
|
+ () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress, bigArrays)));
|
|
|
|
|
|
// write the index metadata for each index in the snapshot
|
|
|
for (IndexId index : indices) {
|
|
@@ -1124,19 +1132,19 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
if (metaUUID == null) {
|
|
|
// We don't yet have this version of the metadata so we write it
|
|
|
metaUUID = UUIDs.base64UUID();
|
|
|
- INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
|
|
|
+ INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress, bigArrays);
|
|
|
indexMetaIdentifiers.put(identifiers, metaUUID);
|
|
|
}
|
|
|
indexMetas.put(index, identifiers);
|
|
|
} else {
|
|
|
INDEX_METADATA_FORMAT.write(
|
|
|
- clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), compress);
|
|
|
+ clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), compress, bigArrays);
|
|
|
}
|
|
|
}
|
|
|
));
|
|
|
}
|
|
|
executor.execute(ActionRunnable.run(allMetaListener,
|
|
|
- () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)));
|
|
|
+ () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress, bigArrays)));
|
|
|
}, onUpdateFailure);
|
|
|
}
|
|
|
|
|
@@ -1157,7 +1165,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
@Override
|
|
|
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
|
|
|
try {
|
|
|
- return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
|
|
|
+ return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
|
|
|
} catch (NoSuchFileException ex) {
|
|
|
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
|
|
|
} catch (IOException | NotXContentException ex) {
|
|
@@ -1168,7 +1176,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
@Override
|
|
|
public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
|
|
|
try {
|
|
|
- return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
|
|
|
+ return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
|
|
|
} catch (NoSuchFileException ex) {
|
|
|
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
|
|
|
} catch (IOException ex) {
|
|
@@ -1180,7 +1188,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
|
|
|
try {
|
|
|
return INDEX_METADATA_FORMAT.read(indexContainer(index),
|
|
|
- repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry);
|
|
|
+ repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry, bigArrays);
|
|
|
} catch (NoSuchFileException e) {
|
|
|
throw new SnapshotMissingException(metadata.name(), snapshotId, e);
|
|
|
}
|
|
@@ -1340,8 +1348,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
// We can cache serialized in the most recent version here without regard to the actual repository metadata version
|
|
|
// since we're only caching the information that we just wrote and thus won't accidentally cache any information that
|
|
|
// isn't safe
|
|
|
- cacheRepositoryData(
|
|
|
- BytesReference.bytes(loaded.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT)), genToLoad);
|
|
|
+ cacheRepositoryData(compressRepoDataForCache(BytesReference.bytes(
|
|
|
+ loaded.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))), genToLoad);
|
|
|
}
|
|
|
listener.onResponse(loaded);
|
|
|
return;
|
|
@@ -1376,38 +1384,48 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
* modification can lead to moving from a higher {@code N} to a lower {@code N} value which mean we can't safely assume that a given
|
|
|
* generation will always contain the same {@link RepositoryData}.
|
|
|
*
|
|
|
- * @param updated serialized RepositoryData to cache if newer than the cache contents
|
|
|
+ * @param serialized serialized RepositoryData to cache if newer than the cache contents or null if no data should be cached for the
|
|
|
+ * given generation
|
|
|
* @param generation repository generation of the given repository data
|
|
|
*/
|
|
|
- private void cacheRepositoryData(BytesReference updated, long generation) {
|
|
|
- if (cacheRepositoryData && bestEffortConsistency == false) {
|
|
|
- final BytesReference serialized;
|
|
|
- try {
|
|
|
- serialized = CompressorFactory.COMPRESSOR.compress(updated);
|
|
|
- final int len = serialized.length();
|
|
|
- if (len > ByteSizeUnit.KB.toBytes(500)) {
|
|
|
- logger.debug("Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in" +
|
|
|
+ private void cacheRepositoryData(@Nullable BytesReference serialized, long generation) {
|
|
|
+ latestKnownRepositoryData.updateAndGet(known -> {
|
|
|
+ if (known != null && known.v1() > generation) {
|
|
|
+ return known;
|
|
|
+ }
|
|
|
+ return serialized == null ? null : new Tuple<>(generation, serialized);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates a compressed version of serialized {@link RepositoryData} that can be used with {@link #cacheRepositoryData} if possible.
|
|
|
+ *
|
|
|
+ * @param uncompressed uncompressed, serialized {@link RepositoryData}
|
|
|
+ * @return compressed repository data to cache or {@code null} if caching is disabled or the data is too large to cache
|
|
|
+ */
|
|
|
+ @Nullable
|
|
|
+ private BytesReference compressRepoDataForCache(BytesReference uncompressed) {
|
|
|
+ if (cacheRepositoryData == false || bestEffortConsistency) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ final BytesReference serialized = CompressorFactory.COMPRESSOR.compress(uncompressed);
|
|
|
+ final int len = serialized.length();
|
|
|
+ if (len > ByteSizeUnit.KB.toBytes(500)) {
|
|
|
+ logger.debug("Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in" +
|
|
|
" serialized size", len, metadata.name());
|
|
|
- if (len > ByteSizeUnit.MB.toBytes(5)) {
|
|
|
- logger.warn("Your repository metadata blob for repository [{}] is larger than 5MB. Consider moving to a fresh" +
|
|
|
+ if (len > ByteSizeUnit.MB.toBytes(5)) {
|
|
|
+ logger.warn("Your repository metadata blob for repository [{}] is larger than 5MB. Consider moving to a fresh" +
|
|
|
" repository for new snapshots or deleting unneeded snapshots from your repository to ensure stable" +
|
|
|
" repository behavior going forward.", metadata.name());
|
|
|
- }
|
|
|
- // Set empty repository data to not waste heap for an outdated cached value
|
|
|
- latestKnownRepositoryData.set(null);
|
|
|
- return;
|
|
|
}
|
|
|
- } catch (IOException e) {
|
|
|
- assert false : new AssertionError("Impossible, no IO happens here", e);
|
|
|
- logger.warn("Failed to serialize repository data", e);
|
|
|
- return;
|
|
|
+ return null;
|
|
|
}
|
|
|
- latestKnownRepositoryData.updateAndGet(known -> {
|
|
|
- if (known != null && known.v1() > generation) {
|
|
|
- return known;
|
|
|
- }
|
|
|
- return new Tuple<>(generation, serialized);
|
|
|
- });
|
|
|
+ return serialized;
|
|
|
+ } catch (IOException e) {
|
|
|
+ assert false : new AssertionError("Impossible, no IO happens here", e);
|
|
|
+ logger.warn("Failed to serialize repository data", e);
|
|
|
+ return null;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1630,9 +1648,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
}
|
|
|
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
|
|
|
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
|
|
|
- final BytesReference serializedRepoData =
|
|
|
- BytesReference.bytes(newRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version));
|
|
|
- writeAtomic(blobContainer(), indexBlob, serializedRepoData, true);
|
|
|
+ final BytesReference repoDataToCache;
|
|
|
+ try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) {
|
|
|
+ try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(Streams.noCloseStream(out))) {
|
|
|
+ newRepositoryData.snapshotsToXContent(xContentBuilder, version);
|
|
|
+ }
|
|
|
+ final BytesReference serializedRepoData = out.bytes();
|
|
|
+ writeAtomic(blobContainer(), indexBlob, serializedRepoData, true);
|
|
|
+ repoDataToCache = compressRepoDataForCache(serializedRepoData);
|
|
|
+ }
|
|
|
maybeWriteIndexLatest(newGen);
|
|
|
|
|
|
// Step 3: Update CS to reflect new repository generation.
|
|
@@ -1664,7 +1688,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
|
|
|
@Override
|
|
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
|
|
- cacheRepositoryData(serializedRepoData, newGen);
|
|
|
+ cacheRepositoryData(repoDataToCache, newGen);
|
|
|
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
|
|
|
// Delete all now outdated index files up to 1000 blobs back from the new generation.
|
|
|
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
|
|
@@ -1987,7 +2011,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
// reference a generation that has not had all its files fully upload.
|
|
|
indexGeneration = UUIDs.randomBase64UUID();
|
|
|
try {
|
|
|
- INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compress);
|
|
|
+ INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compress,
|
|
|
+ bigArrays);
|
|
|
} catch (IOException e) {
|
|
|
throw new IndexShardSnapshotFailedException(shardId,
|
|
|
"Failed to write shard level snapshot metadata for [" + snapshotId + "] to ["
|
|
@@ -2039,7 +2064,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
|
|
|
lastSnapshotStatus.getIncrementalFileCount(),
|
|
|
lastSnapshotStatus.getIncrementalSize()
|
|
|
- ), shardContainer, snapshotId.getUUID(), compress);
|
|
|
+ ), shardContainer, snapshotId.getUUID(), compress, bigArrays);
|
|
|
} catch (IOException e) {
|
|
|
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
|
|
|
}
|
|
@@ -2314,7 +2339,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
|
|
|
if (indexGeneration < 0L) {
|
|
|
writtenGeneration = UUIDs.randomBase64UUID();
|
|
|
- INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress);
|
|
|
+ INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress, bigArrays);
|
|
|
} else {
|
|
|
writtenGeneration = String.valueOf(indexGeneration);
|
|
|
writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots);
|
|
@@ -2339,7 +2364,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
logger.trace(() -> new ParameterizedMessage("[{}] Writing shard index [{}] to [{}]", metadata.name(),
|
|
|
indexGeneration, shardContainer.path()));
|
|
|
final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(String.valueOf(indexGeneration));
|
|
|
- writeAtomic(shardContainer, blobName, INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compress), true);
|
|
|
+ INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compress, bigArrays,
|
|
|
+ bytes -> writeAtomic(shardContainer, blobName, bytes, true));
|
|
|
}
|
|
|
|
|
|
// Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all
|
|
@@ -2360,7 +2386,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
*/
|
|
|
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
|
|
|
try {
|
|
|
- return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry);
|
|
|
+ return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry, bigArrays);
|
|
|
} catch (NoSuchFileException ex) {
|
|
|
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
|
|
|
} catch (IOException ex) {
|
|
@@ -2386,7 +2412,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) {
|
|
|
return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN);
|
|
|
}
|
|
|
- return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry), generation);
|
|
|
+ return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry, bigArrays), generation);
|
|
|
}
|
|
|
final Tuple<BlobStoreIndexShardSnapshots, Long> legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
|
|
|
return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2()));
|
|
@@ -2403,7 +2429,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
long latest = latestGeneration(blobs);
|
|
|
if (latest >= 0) {
|
|
|
final BlobStoreIndexShardSnapshots shardSnapshots =
|
|
|
- INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry);
|
|
|
+ INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry, bigArrays);
|
|
|
return new Tuple<>(shardSnapshots, latest);
|
|
|
} else if (blobs.stream().anyMatch(b -> b.startsWith(SNAPSHOT_PREFIX) || b.startsWith(INDEX_FILE_PREFIX)
|
|
|
|| b.startsWith(UPLOADED_DATA_BLOB_PREFIX))) {
|