|
@@ -64,9 +64,11 @@ import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
|
|
+import org.elasticsearch.common.compress.CompressorFactory;
|
|
|
import org.elasticsearch.common.compress.NotXContentException;
|
|
|
import org.elasticsearch.common.io.Streams;
|
|
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|
|
+import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.common.lucene.Lucene;
|
|
|
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
|
|
import org.elasticsearch.common.metrics.CounterMetric;
|
|
@@ -77,6 +79,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
|
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
|
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
|
|
+import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
|
|
import org.elasticsearch.common.xcontent.XContentParser;
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
@@ -129,6 +132,7 @@ import java.util.concurrent.Executor;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.LongStream;
|
|
@@ -205,8 +209,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
public static final Setting<Boolean> ALLOW_CONCURRENT_MODIFICATION =
|
|
|
Setting.boolSetting("allow_concurrent_modifications", false, Setting.Property.Deprecated);
|
|
|
|
|
|
+ /**
|
|
|
+ * Setting to disable caching of the latest repository data.
|
|
|
+ */
|
|
|
+ public static final Setting<Boolean> CACHE_REPOSITORY_DATA =
|
|
|
+ Setting.boolSetting("cache_repository_data", true, Setting.Property.Deprecated);
|
|
|
+
|
|
|
private final boolean compress;
|
|
|
|
|
|
+ private final boolean cacheRepositoryData;
|
|
|
+
|
|
|
private final RateLimiter snapshotRateLimiter;
|
|
|
|
|
|
private final RateLimiter restoreRateLimiter;
|
|
@@ -282,6 +294,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
|
|
|
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
|
|
|
readOnly = metadata.settings().getAsBoolean("readonly", false);
|
|
|
+ cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
|
|
|
this.basePath = basePath;
|
|
|
|
|
|
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT,
|
|
@@ -510,13 +523,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
* @param rootBlobs Blobs at the repository root
|
|
|
* @return RepositoryData
|
|
|
*/
|
|
|
- private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
|
|
|
+ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) throws IOException {
|
|
|
final long generation = latestGeneration(rootBlobs.keySet());
|
|
|
final long genToLoad;
|
|
|
+ final Tuple<Long, BytesReference> cached;
|
|
|
if (bestEffortConsistency) {
|
|
|
genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId));
|
|
|
+ cached = null;
|
|
|
} else {
|
|
|
genToLoad = latestKnownRepoGen.get();
|
|
|
+ cached = latestKnownRepositoryData.get();
|
|
|
}
|
|
|
if (genToLoad > generation) {
|
|
|
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
|
|
@@ -529,6 +545,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
|
|
|
repositoryStateId + "], actual current generation [" + genToLoad + "]");
|
|
|
}
|
|
|
+ if (cached != null && cached.v1() == genToLoad) {
|
|
|
+ return repositoryDataFromCachedEntry(cached);
|
|
|
+ }
|
|
|
return getRepositoryData(genToLoad);
|
|
|
}
|
|
|
|
|
@@ -1057,6 +1076,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
// and concurrent modifications.
|
|
|
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN);
|
|
|
|
|
|
+ // Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
|
|
|
+ private final AtomicReference<Tuple<Long, BytesReference>> latestKnownRepositoryData = new AtomicReference<>();
|
|
|
+
|
|
|
@Override
|
|
|
public void getRepositoryData(ActionListener<RepositoryData> listener) {
|
|
|
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
|
|
@@ -1090,7 +1112,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
genToLoad = latestKnownRepoGen.get();
|
|
|
}
|
|
|
try {
|
|
|
- listener.onResponse(getRepositoryData(genToLoad));
|
|
|
+ final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
|
|
|
+ final RepositoryData loaded;
|
|
|
+ // Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
|
|
|
+ if (bestEffortConsistency == false && cached != null && cached.v1() == genToLoad) {
|
|
|
+ loaded = repositoryDataFromCachedEntry(cached);
|
|
|
+ } else {
|
|
|
+ loaded = getRepositoryData(genToLoad);
|
|
|
+ cacheRepositoryData(loaded);
|
|
|
+ }
|
|
|
+ listener.onResponse(loaded);
|
|
|
return;
|
|
|
} catch (RepositoryException e) {
|
|
|
// If the generation to load changed concurrently and we didn't just try loading the same generation before we retry
|
|
@@ -1116,6 +1147,59 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Puts the given {@link RepositoryData} into the cache if it is of a newer generation and only if the repository is not using
|
|
|
+ * {@link #bestEffortConsistency}. When using {@link #bestEffortConsistency} the repository is using listing to find the latest
|
|
|
+ * {@code index-N} blob and there are no hard guarantees that a given repository generation won't be reused since an external
|
|
|
+ * modification can lead to moving from a higher {@code N} to a lower {@code N} value which mean we can't safely assume that a given
|
|
|
+ * generation will always contain the same {@link RepositoryData}.
|
|
|
+ *
|
|
|
+ * @param updated RepositoryData to cache if newer than the cache contents
|
|
|
+ */
|
|
|
+ private void cacheRepositoryData(RepositoryData updated) {
|
|
|
+ if (cacheRepositoryData && bestEffortConsistency == false) {
|
|
|
+ final BytesReference serialized;
|
|
|
+ BytesStreamOutput out = new BytesStreamOutput();
|
|
|
+ try {
|
|
|
+ try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out);
|
|
|
+ XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) {
|
|
|
+ updated.snapshotsToXContent(builder, true);
|
|
|
+ }
|
|
|
+ serialized = out.bytes();
|
|
|
+ final int len = serialized.length();
|
|
|
+ if (len > ByteSizeUnit.KB.toBytes(500)) {
|
|
|
+ logger.debug("Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in" +
|
|
|
+ " serialized size", len, metadata.name());
|
|
|
+ if (len > ByteSizeUnit.MB.toBytes(5)) {
|
|
|
+ logger.warn("Your repository metadata blob for repository [{}] is larger than 5MB. Consider moving to a fresh" +
|
|
|
+ " repository for new snapshots or deleting unneeded snapshots from your repository to ensure stable" +
|
|
|
+ " repository behavior going forward.", metadata.name());
|
|
|
+ }
|
|
|
+ // Set empty repository data to not waste heap for an outdated cached value
|
|
|
+ latestKnownRepositoryData.set(null);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ assert false : new AssertionError("Impossible, no IO happens here", e);
|
|
|
+ logger.warn("Failed to serialize repository data", e);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ latestKnownRepositoryData.updateAndGet(known -> {
|
|
|
+ if (known != null && known.v1() > updated.getGenId()) {
|
|
|
+ return known;
|
|
|
+ }
|
|
|
+ return new Tuple<>(updated.getGenId(), serialized);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private RepositoryData repositoryDataFromCachedEntry(Tuple<Long, BytesReference> cacheEntry) throws IOException {
|
|
|
+ return RepositoryData.snapshotsFromXContent(
|
|
|
+ XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
|
|
|
+ LoggingDeprecationHandler.INSTANCE,
|
|
|
+ CompressorFactory.COMPRESSOR.streamInput(cacheEntry.v2().streamInput())), cacheEntry.v1());
|
|
|
+ }
|
|
|
+
|
|
|
private RepositoryException corruptedStateException(@Nullable Exception cause) {
|
|
|
return new RepositoryException(metadata.name(),
|
|
|
"Could not read repository data because the contents of the repository do not match its " +
|
|
@@ -1362,6 +1446,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|
|
|
|
|
@Override
|
|
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
|
|
+ cacheRepositoryData(filteredRepositoryData.withGenId(newGen));
|
|
|
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(listener, () -> {
|
|
|
// Delete all now outdated index files up to 1000 blobs back from the new generation.
|
|
|
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
|