|
@@ -21,6 +21,9 @@ package org.elasticsearch.repositories.s3;
|
|
|
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
+import org.elasticsearch.action.ActionListener;
|
|
|
+import org.elasticsearch.action.ActionRunnable;
|
|
|
+import org.elasticsearch.cluster.metadata.MetaData;
|
|
|
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.Strings;
|
|
@@ -29,11 +32,23 @@ import org.elasticsearch.common.blobstore.BlobStore;
|
|
|
import org.elasticsearch.common.settings.Setting;
|
|
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
+import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
|
|
import org.elasticsearch.monitor.jvm.JvmInfo;
|
|
|
import org.elasticsearch.repositories.RepositoryException;
|
|
|
+import org.elasticsearch.repositories.ShardGenerations;
|
|
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
|
|
-
|
|
|
+import org.elasticsearch.snapshots.SnapshotId;
|
|
|
+import org.elasticsearch.snapshots.SnapshotInfo;
|
|
|
+import org.elasticsearch.snapshots.SnapshotShardFailure;
|
|
|
+import org.elasticsearch.snapshots.SnapshotsService;
|
|
|
+import org.elasticsearch.threadpool.Scheduler;
|
|
|
+import org.elasticsearch.threadpool.ThreadPool;
|
|
|
+
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Function;
|
|
|
|
|
|
/**
|
|
@@ -126,6 +141,23 @@ class S3Repository extends BlobStoreRepository {
|
|
|
|
|
|
static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());
|
|
|
|
|
|
+ /**
|
|
|
+ * Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the
|
|
|
+ * backwards compatible snapshot format from before
|
|
|
+ * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link org.elasticsearch.Version#V_7_6_0}).
|
|
|
+ * This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when
|
|
|
+ * doing repository operations in rapid succession on a repository in the old metadata format.
|
|
|
+ * This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository
|
|
|
+ * becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than
|
|
|
+ * {@link org.elasticsearch.Version#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new
|
|
|
+ * format and disable the cooldown period.
|
|
|
+ */
|
|
|
+ static final Setting<TimeValue> COOLDOWN_PERIOD = Setting.timeSetting(
|
|
|
+ "cooldown_period",
|
|
|
+ new TimeValue(3, TimeUnit.MINUTES),
|
|
|
+ new TimeValue(0, TimeUnit.MILLISECONDS),
|
|
|
+ Setting.Property.Dynamic);
|
|
|
+
|
|
|
/**
|
|
|
* Specifies the path within bucket to repository data. Defaults to root directory.
|
|
|
*/
|
|
@@ -145,6 +177,12 @@ class S3Repository extends BlobStoreRepository {
|
|
|
|
|
|
private final String cannedACL;
|
|
|
|
|
|
+ /**
|
|
|
+ * Time period to delay repository operations by after finalizing or deleting a snapshot.
|
|
|
+ * See {@link #COOLDOWN_PERIOD} for details.
|
|
|
+ */
|
|
|
+ private final TimeValue coolDown;
|
|
|
+
|
|
|
/**
|
|
|
* Constructs an s3 backed repository
|
|
|
*/
|
|
@@ -176,6 +214,8 @@ class S3Repository extends BlobStoreRepository {
|
|
|
this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings());
|
|
|
this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings());
|
|
|
|
|
|
+ coolDown = COOLDOWN_PERIOD.get(metadata.settings());
|
|
|
+
|
|
|
logger.debug(
|
|
|
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]",
|
|
|
bucket,
|
|
@@ -186,6 +226,70 @@ class S3Repository extends BlobStoreRepository {
|
|
|
storageClass);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be
|
|
|
+ * closed concurrently.
|
|
|
+ */
|
|
|
+ private final AtomicReference<Scheduler.Cancellable> finalizationFuture = new AtomicReference<>();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
|
|
|
+ List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
|
|
|
+ MetaData clusterMetaData, Map<String, Object> userMetadata, boolean writeShardGens,
|
|
|
+ ActionListener<SnapshotInfo> listener) {
|
|
|
+ if (writeShardGens == false) {
|
|
|
+ listener = delayedListener(listener);
|
|
|
+ }
|
|
|
+ super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
|
|
|
+ includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
|
|
|
+ if (writeShardGens == false) {
|
|
|
+ listener = delayedListener(listener);
|
|
|
+ }
|
|
|
+ super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked.
|
|
|
+ * See {@link #COOLDOWN_PERIOD} for details.
|
|
|
+ */
|
|
|
+ private <T> ActionListener<T> delayedListener(ActionListener<T> listener) {
|
|
|
+ final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
|
|
|
+ final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
|
|
|
+ assert cancellable != null;
|
|
|
+ });
|
|
|
+ return new ActionListener<>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(T response) {
|
|
|
+ logCooldownInfo();
|
|
|
+ final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
|
|
|
+ threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)),
|
|
|
+ coolDown, ThreadPool.Names.SNAPSHOT));
|
|
|
+ assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ logCooldownInfo();
|
|
|
+ final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
|
|
|
+ threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT));
|
|
|
+ assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ private void logCooldownInfo() {
|
|
|
+ logger.info("Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" +
|
|
|
+ " and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " +
|
|
|
+ "repository corruption. To get rid of this message and move to the new repository metadata format, either remove " +
|
|
|
+ "all snapshots older than version [{}] from the repository or create a new repository at an empty location.",
|
|
|
+ coolDown, metadata.name(), SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION,
|
|
|
+ SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION);
|
|
|
+ }
|
|
|
+
|
|
|
private static BlobPath buildBasePath(RepositoryMetaData metadata) {
|
|
|
final String basePath = BASE_PATH_SETTING.get(metadata.settings());
|
|
|
if (Strings.hasLength(basePath)) {
|
|
@@ -210,4 +314,14 @@ class S3Repository extends BlobStoreRepository {
|
|
|
protected ByteSizeValue chunkSize() {
|
|
|
return chunkSize;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void doClose() {
|
|
|
+ final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
|
|
|
+ if (cancellable != null) {
|
|
|
+ logger.debug("Repository [{}] closed during cool-down period", metadata.name());
|
|
|
+ cancellable.cancel();
|
|
|
+ }
|
|
|
+ super.doClose();
|
|
|
+ }
|
|
|
}
|