Browse Source

Add support for partial searchable snapshots to ILM (#68714)

This commit adds support for the recently introduced partial searchable snapshot (#68509) to ILM.

Searchable snapshot ILM actions may now be specified with a `storage` option, specifying either
`full_copy` or `shared_cache` (similar to the "mount" API) to mount either a full or partial
searchable snapshot:

```json
PUT _ilm/policy/my_policy
{
  "policy": {
    "phases": {
      "cold": {
        "actions": {
          "searchable_snapshot" : {
            "snapshot_repository" : "backing_repo",
            "storage": "shared_cache"
          }
        }
      }
    }
  }
}
```

Internally, If more than one searchable snapshot action is specified (for example, a full searchable
snapshot in the "cold" phase and a partial searchable snapshot in the "frozen" phase) ILM will
re-use the existing snapshot when doing the second mount since a second snapshot is not required.

Currently this is allowed for actions that use the same repository, however, multiple
`searchable_snapshot` actions for the same index that use different repositories is not allowed (the
ERROR state is entered). We plan to allow this in the future in subsequent work.

If the `storage` option is not specified in the `searchable_snapshot` action, the mount type
defaults to "shared_cache" in the frozen phase and "full_copy" in all other phases.

Relates to #68605
Lee Hinman 4 years ago
parent
commit
e552fd7ce6
18 changed files with 690 additions and 74 deletions
  1. 8 1
      docs/reference/ilm/actions/ilm-searchable-snapshot.asciidoc
  2. 12 0
      server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
  3. 13 0
      server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
  4. 1 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CopyExecutionStateStep.java
  5. 1 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java
  6. 27 5
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleExecutionState.java
  7. 43 7
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java
  8. 168 25
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java
  9. 16 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/MountSearchableSnapshotRequest.java
  10. 2 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecycleExecutionStateTests.java
  11. 102 9
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStepTests.java
  12. 54 7
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java
  13. 4 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/StepKeyTests.java
  14. 6 2
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java
  15. 5 3
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/LifecycleLicenseIT.java
  16. 1 1
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java
  17. 1 1
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java
  18. 226 13
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java

+ 8 - 1
docs/reference/ilm/actions/ilm-searchable-snapshot.asciidoc

@@ -54,6 +54,12 @@ the shards are relocating, in which case they will not be merged.
 The `searchable_snapshot` action will continue executing even if not all shards
 are force merged.
 
+`storage`::
+(Optional, string)
+Specifies the type of snapshot that should be mounted for a searchable snapshot. This corresponds to
+the <<searchable-snapshots-api-mount-query-params, `storage` option when mounting a snapshot>>.
+Defaults to `full_copy` in non-frozen phases, or `shared_cache` in the frozen phase.
+
 [[ilm-searchable-snapshot-ex]]
 ==== Examples
 [source,console]
@@ -65,7 +71,8 @@ PUT _ilm/policy/my_policy
       "cold": {
         "actions": {
           "searchable_snapshot" : {
-            "snapshot_repository" : "backing_repo"
+            "snapshot_repository" : "backing_repo",
+            "storage": "shared_cache"
           }
         }
       }

+ 12 - 0
server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

@@ -1262,6 +1262,18 @@ public abstract class StreamInput extends InputStream {
         return readEnum(enumClass, enumClass.getEnumConstants());
     }
 
+    /**
+     * Reads an optional enum with type E that was serialized based on the value of its ordinal
+     */
+    @Nullable
+    public <E extends Enum<E>> E readOptionalEnum(Class<E> enumClass) throws IOException {
+        if (readBoolean()) {
+            return readEnum(enumClass, enumClass.getEnumConstants());
+        } else {
+            return null;
+        }
+    }
+
     private <E extends Enum<E>> E readEnum(Class<E> enumClass, E[] values) throws IOException {
         int ordinal = readVInt();
         if (ordinal < 0 || ordinal >= values.length) {

+ 13 - 0
server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

@@ -1267,6 +1267,19 @@ public abstract class StreamOutput extends OutputStream {
         writeVInt(enumValue.ordinal());
     }
 
+    /**
+     * Writes an optional enum with type E based on its ordinal value
+     */
+    public <E extends Enum<E>> void writeOptionalEnum(@Nullable E enumValue) throws IOException {
+        if (enumValue == null) {
+            writeBoolean(false);
+        } else {
+            writeBoolean(true);
+            assert enumValue instanceof XContentType == false : "XContentHelper#writeTo should be used for XContentType serialisation";
+            writeVInt(enumValue.ordinal());
+        }
+    }
+
     /**
      * Writes an EnumSet with type E that by serialized it based on it's ordinal value
      */

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CopyExecutionStateStep.java

@@ -85,6 +85,7 @@ public class CopyExecutionStateStep extends ClusterStateActionStep {
         }
         relevantTargetCustomData.setSnapshotRepository(lifecycleState.getSnapshotRepository());
         relevantTargetCustomData.setSnapshotName(lifecycleState.getSnapshotName());
+        relevantTargetCustomData.setSnapshotIndexName(lifecycleState.getSnapshotIndexName());
 
         Metadata.Builder newMetadata = Metadata.builder(clusterState.getMetadata())
             .put(IndexMetadata.builder(targetIndexMetadata)

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java

@@ -79,6 +79,7 @@ public class GenerateSnapshotNameStep extends ClusterStateActionStep {
         }
         newCustomData.setSnapshotName(snapshotName);
         newCustomData.setSnapshotRepository(snapshotRepository);
+        newCustomData.setSnapshotIndexName(index.getName());
 
         IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetaData);
         indexMetadataBuilder.putCustom(ILM_CUSTOM_METADATA_KEY, newCustomData.build().asMap());

+ 27 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleExecutionState.java

@@ -37,8 +37,9 @@ public class LifecycleExecutionState {
     private static final String FAILED_STEP_RETRY_COUNT = "failed_step_retry_count";
     private static final String STEP_INFO = "step_info";
     private static final String PHASE_DEFINITION = "phase_definition";
-    private static final String SNAPSHOT_NAME ="snapshot_name";
-    private static final String SNAPSHOT_REPOSITORY ="snapshot_repository";
+    private static final String SNAPSHOT_NAME = "snapshot_name";
+    private static final String SNAPSHOT_REPOSITORY = "snapshot_repository";
+    private static final String SNAPSHOT_INDEX_NAME = "snapshot_index_name";
 
     private final String phase;
     private final String action;
@@ -54,10 +55,12 @@ public class LifecycleExecutionState {
     private final Long stepTime;
     private final String snapshotName;
     private final String snapshotRepository;
+    private final String snapshotIndexName;
 
     private LifecycleExecutionState(String phase, String action, String step, String failedStep, Boolean isAutoRetryableError,
                                     Integer failedStepRetryCount, String stepInfo, String phaseDefinition, Long lifecycleDate,
-                                    Long phaseTime, Long actionTime, Long stepTime, String snapshotRepository, String snapshotName) {
+                                    Long phaseTime, Long actionTime, Long stepTime, String snapshotRepository, String snapshotName,
+                                    String snapshotIndexName) {
         this.phase = phase;
         this.action = action;
         this.step = step;
@@ -72,6 +75,7 @@ public class LifecycleExecutionState {
         this.stepTime = stepTime;
         this.snapshotRepository = snapshotRepository;
         this.snapshotName = snapshotName;
+        this.snapshotIndexName = snapshotIndexName;
     }
 
     /**
@@ -131,6 +135,7 @@ public class LifecycleExecutionState {
             .setActionTime(state.actionTime)
             .setSnapshotRepository(state.snapshotRepository)
             .setSnapshotName(state.snapshotName)
+            .setSnapshotIndexName(state.snapshotIndexName)
             .setStepTime(state.stepTime);
     }
 
@@ -198,6 +203,9 @@ public class LifecycleExecutionState {
                     e, STEP_TIME, customData.get(STEP_TIME));
             }
         }
+        if (customData.containsKey(SNAPSHOT_INDEX_NAME)) {
+            builder.setSnapshotIndexName(customData.get(SNAPSHOT_INDEX_NAME));
+        }
         return builder.build();
     }
 
@@ -250,6 +258,9 @@ public class LifecycleExecutionState {
         if (snapshotName != null) {
             result.put(SNAPSHOT_NAME, snapshotName);
         }
+        if (snapshotIndexName != null) {
+            result.put(SNAPSHOT_INDEX_NAME, snapshotIndexName);
+        }
         return Collections.unmodifiableMap(result);
     }
 
@@ -309,6 +320,10 @@ public class LifecycleExecutionState {
         return snapshotRepository;
     }
 
+    public String getSnapshotIndexName() {
+        return snapshotIndexName;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -327,6 +342,7 @@ public class LifecycleExecutionState {
             Objects.equals(getStepInfo(), that.getStepInfo()) &&
             Objects.equals(getSnapshotRepository(), that.getSnapshotRepository()) &&
             Objects.equals(getSnapshotName(), that.getSnapshotName()) &&
+            Objects.equals(getSnapshotIndexName(), that.getSnapshotIndexName()) &&
             Objects.equals(getPhaseDefinition(), that.getPhaseDefinition());
     }
 
@@ -334,7 +350,7 @@ public class LifecycleExecutionState {
     public int hashCode() {
         return Objects.hash(getPhase(), getAction(), getStep(), getFailedStep(), isAutoRetryableError(), getFailedStepRetryCount(),
             getStepInfo(), getPhaseDefinition(), getLifecycleDate(), getPhaseTime(), getActionTime(), getStepTime(),
-            getSnapshotRepository(), getSnapshotName());
+            getSnapshotRepository(), getSnapshotName(), getSnapshotIndexName());
     }
 
     @Override
@@ -357,6 +373,7 @@ public class LifecycleExecutionState {
         private Integer failedStepRetryCount;
         private String snapshotName;
         private String snapshotRepository;
+        private String snapshotIndexName;
 
         public Builder setPhase(String phase) {
             this.phase = phase;
@@ -428,9 +445,14 @@ public class LifecycleExecutionState {
             return this;
         }
 
+        public Builder setSnapshotIndexName(String snapshotIndexName) {
+            this.snapshotIndexName = snapshotIndexName;
+            return this;
+        }
+
         public LifecycleExecutionState build() {
             return new LifecycleExecutionState(phase, action, step, failedStep, isAutoRetryableError, failedStepRetryCount, stepInfo,
-                phaseDefinition, indexCreationDate, phaseTime, actionTime, stepTime, snapshotRepository, snapshotName);
+                phaseDefinition, indexCreationDate, phaseTime, actionTime, stepTime, snapshotRepository, snapshotName, snapshotIndexName);
         }
     }
 

+ 43 - 7
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java

@@ -34,10 +34,13 @@ public class MountSnapshotStep extends AsyncRetryDuringSnapshotActionStep {
     private static final Logger logger = LogManager.getLogger(MountSnapshotStep.class);
 
     private final String restoredIndexPrefix;
+    private final MountSearchableSnapshotRequest.Storage storageType;
 
-    public MountSnapshotStep(StepKey key, StepKey nextStepKey, Client client, String restoredIndexPrefix) {
+    public MountSnapshotStep(StepKey key, StepKey nextStepKey, Client client, String restoredIndexPrefix,
+                             MountSearchableSnapshotRequest.Storage storageType) {
         super(key, nextStepKey, client);
         this.restoredIndexPrefix = restoredIndexPrefix;
+        this.storageType = Objects.requireNonNull(storageType, "a storage type must be specified");
     }
 
     @Override
@@ -49,9 +52,13 @@ public class MountSnapshotStep extends AsyncRetryDuringSnapshotActionStep {
         return restoredIndexPrefix;
     }
 
+    public MountSearchableSnapshotRequest.Storage getStorage() {
+        return storageType;
+    }
+
     @Override
     void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, Listener listener) {
-        final String indexName = indexMetadata.getIndex().getName();
+        String indexName = indexMetadata.getIndex().getName();
 
         LifecycleExecutionState lifecycleState = fromIndexMetadata(indexMetadata);
 
@@ -71,13 +78,29 @@ public class MountSnapshotStep extends AsyncRetryDuringSnapshotActionStep {
         }
 
         String mountedIndexName = restoredIndexPrefix + indexName;
-        if(currentClusterState.metadata().index(mountedIndexName) != null) {
+        if (currentClusterState.metadata().index(mountedIndexName) != null) {
             logger.debug("mounted index [{}] for policy [{}] and index [{}] already exists. will not attempt to mount the index again",
                 mountedIndexName, policyName, indexName);
             listener.onResponse(true);
             return;
         }
 
+        final String snapshotIndexName = lifecycleState.getSnapshotIndexName();
+        if (snapshotIndexName == null) {
+            // This index had its searchable snapshot created prior to a version where we captured
+            // the original index name, so make our best guess at the name
+            indexName = bestEffortIndexNameResolution(indexName);
+            logger.debug("index [{}] using policy [{}] does not have a stored snapshot index name, " +
+                "using our best effort guess of [{}] for the original snapshotted index name",
+                indexMetadata.getIndex().getName(), policyName, indexName);
+        } else {
+            // Use the name of the snapshot as specified in the metadata, because the current index
+            // name not might not reflect the name of the index actually in the snapshot
+            logger.debug("index [{}] using policy [{}] has a different name [{}] within the snapshot to be restored, " +
+                "using the snapshot index name from generated metadata for mounting", indexName, policyName, snapshotIndexName);
+            indexName = snapshotIndexName;
+        }
+
         final MountSearchableSnapshotRequest mountSearchableSnapshotRequest = new MountSearchableSnapshotRequest(mountedIndexName,
             snapshotRepository, snapshotName, indexName, Settings.builder()
             .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString())
@@ -91,8 +114,7 @@ public class MountSnapshotStep extends AsyncRetryDuringSnapshotActionStep {
             // we'll not wait for the snapshot to complete in this step as the async steps are executed from threads that shouldn't
             // perform expensive operations (ie. clusterStateProcessed)
             false,
-            // restoring into the cold tier, so use a full local copy
-            MountSearchableSnapshotRequest.Storage.FULL_COPY);
+            storageType);
         getClient().execute(MountSearchableSnapshotAction.INSTANCE, mountSearchableSnapshotRequest,
             ActionListener.wrap(response -> {
                 if (response.status() != RestStatus.OK && response.status() != RestStatus.ACCEPTED) {
@@ -103,9 +125,21 @@ public class MountSnapshotStep extends AsyncRetryDuringSnapshotActionStep {
             }, listener::onFailure));
     }
 
+    /**
+     * Tries to guess the original index name given the current index name, tries to drop the
+     * "partial-" and "restored-" prefixes, since those are what ILM uses. Does not handle
+     * unorthodox cases like "restored-partial-[indexname]" since this is not intended to be
+     * exhaustive.
+     */
+    static String bestEffortIndexNameResolution(String indexName) {
+        String originalName = indexName.replaceFirst("^" + SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX, "");
+        originalName = originalName.replaceFirst("^" + SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX, "");
+        return originalName;
+    }
+
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), restoredIndexPrefix);
+        return Objects.hash(super.hashCode(), restoredIndexPrefix, storageType);
     }
 
     @Override
@@ -117,6 +151,8 @@ public class MountSnapshotStep extends AsyncRetryDuringSnapshotActionStep {
             return false;
         }
         MountSnapshotStep other = (MountSnapshotStep) obj;
-        return super.equals(obj) && Objects.equals(restoredIndexPrefix, other.restoredIndexPrefix);
+        return super.equals(obj) &&
+            Objects.equals(restoredIndexPrefix, other.restoredIndexPrefix) &&
+            Objects.equals(storageType, other.storageType);
     }
 }

+ 168 - 25
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java

@@ -13,6 +13,7 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -24,6 +25,7 @@ import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.xpack.core.XPackPlugin;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
+import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -42,17 +44,30 @@ public class SearchableSnapshotAction implements LifecycleAction {
 
     public static final ParseField SNAPSHOT_REPOSITORY = new ParseField("snapshot_repository");
     public static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index");
+    public static final ParseField STORAGE = new ParseField("storage");
     public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";
     public static final String CONDITIONAL_SKIP_ACTION_STEP = BranchingStep.NAME + "-check-prerequisites";
+    public static final String CONDITIONAL_SKIP_GENERATE_AND_CLEAN = BranchingStep.NAME + "-check-existing-snapshot";
 
-    public static final String RESTORED_INDEX_PREFIX = "restored-";
+    public static final String FULL_RESTORED_INDEX_PREFIX = "restored-";
+    public static final String PARTIAL_RESTORED_INDEX_PREFIX = "partial-";
 
     private static final ConstructingObjectParser<SearchableSnapshotAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
-        a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1]));
+        a -> {
+            String storageName = (String) a[2];
+            final MountSearchableSnapshotRequest.Storage storageType;
+            if (storageName == null) {
+                storageType = null;
+            } else {
+                storageType = MountSearchableSnapshotRequest.Storage.fromString(storageName);
+            }
+            return new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1], storageType);
+        });
 
     static {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), SNAPSHOT_REPOSITORY);
         PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX);
+        PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), STORAGE);
     }
 
 
@@ -62,27 +77,46 @@ public class SearchableSnapshotAction implements LifecycleAction {
 
     private final String snapshotRepository;
     private final boolean forceMergeIndex;
+    @Nullable
+    private final MountSearchableSnapshotRequest.Storage storageType;
 
-    public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex) {
+    public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex,
+                                    @Nullable MountSearchableSnapshotRequest.Storage type) {
         if (Strings.hasText(snapshotRepository) == false) {
             throw new IllegalArgumentException("the snapshot repository must be specified");
         }
         this.snapshotRepository = snapshotRepository;
         this.forceMergeIndex = forceMergeIndex;
+        this.storageType = type;
     }
 
     public SearchableSnapshotAction(String snapshotRepository) {
-        this(snapshotRepository, true);
+        this(snapshotRepository, true, null);
     }
 
     public SearchableSnapshotAction(StreamInput in) throws IOException {
-        this(in.readString(), in.getVersion().onOrAfter(Version.V_7_10_0) ? in.readBoolean() : true);
+        this.snapshotRepository = in.readString();
+        if (in.getVersion().onOrAfter(Version.V_7_10_0)) {
+            this.forceMergeIndex = in.readBoolean();
+        } else {
+            this.forceMergeIndex = true;
+        }
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            this.storageType = in.readOptionalEnum(MountSearchableSnapshotRequest.Storage.class);
+        } else {
+            this.storageType = null;
+        }
     }
 
     boolean isForceMergeIndex() {
         return forceMergeIndex;
     }
 
+    @Nullable
+    public MountSearchableSnapshotRequest.Storage getStorageType() {
+        return storageType;
+    }
+
     @Override
     public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
         StepKey preActionBranchingKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_ACTION_STEP);
@@ -90,6 +124,7 @@ public class SearchableSnapshotAction implements LifecycleAction {
         StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME);
         StepKey forceMergeStepKey = new StepKey(phase, NAME, ForceMergeStep.NAME);
         StepKey waitForSegmentCountKey = new StepKey(phase, NAME, SegmentCountStep.NAME);
+        StepKey skipGeneratingSnapshotKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_GENERATE_AND_CLEAN);
         StepKey generateSnapshotNameKey = new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME);
         StepKey cleanSnapshotKey = new StepKey(phase, NAME, CleanupSnapshotStep.NAME);
         StepKey createSnapshotKey = new StepKey(phase, NAME, CreateSnapshotStep.NAME);
@@ -102,6 +137,10 @@ public class SearchableSnapshotAction implements LifecycleAction {
         StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME);
         StepKey deleteIndexKey = new StepKey(phase, NAME, DeleteStep.NAME);
 
+        // Before going through all these steps, first check if we need to do them at all. For example, the index could already be
+        // a searchable snapshot of the same type and repository, in which case we don't need to do anything. If that is detected,
+        // this branching step jumps right to the end, skipping the searchable snapshot action entirely. We also check the license
+        // here before generating snapshots that can't be used if the user doesn't have the right license level.
         BranchingStep conditionalSkipActionStep = new BranchingStep(preActionBranchingKey, checkNoWriteIndex, nextStepKey,
             (index, clusterState) -> {
                 XPackLicenseState licenseState = XPackPlugin.getSharedLicenseState();
@@ -112,22 +151,82 @@ public class SearchableSnapshotAction implements LifecycleAction {
 
                 IndexMetadata indexMetadata = clusterState.getMetadata().index(index);
                 assert indexMetadata != null : "index " + index.getName() + " must exist in the cluster state";
+                String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetadata.getSettings());
                 if (indexMetadata.getSettings().get(LifecycleSettings.SNAPSHOT_INDEX_NAME) != null) {
-                    logger.warn("[{}] action is configured for index [{}] in policy [{}] which is already mounted as searchable " +
-                            "snapshot. Skipping this action", SearchableSnapshotAction.NAME, index.getName(),
-                        LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetadata.getSettings()));
-                    return true;
+                    // The index is already a searchable snapshot, let's see if the repository matches
+                    // TODO: move the searchable snapshot settings into x-pack
+                    //  core in the future, so the Settings can be used instead
+                    //  of strings here
+                    String repo = indexMetadata.getSettings().get("index.store.snapshot.repository_name");
+                    if (this.snapshotRepository.equals(repo) == false) {
+                        // Okay, different repo, we need to go ahead with the searchable snapshot
+                        logger.debug("[{}] action is configured for index [{}] in policy [{}] which is already mounted as a searchable " +
+                                "snapshot, but with a different repository (existing: [{}] vs new: [{}]), a new snapshot and " +
+                                "index will be created",
+                            SearchableSnapshotAction.NAME, index.getName(), policyName, repo, this.snapshotRepository);
+                        return false;
+                    }
+
+                    // Check to the storage type to see if we need to convert between full <-> partial
+                    boolean partial = indexMetadata.getSettings().getAsBoolean("index.store.snapshot.partial", false);
+                    MountSearchableSnapshotRequest.Storage existingType =
+                        partial ? MountSearchableSnapshotRequest.Storage.SHARED_CACHE : MountSearchableSnapshotRequest.Storage.FULL_COPY;
+                    MountSearchableSnapshotRequest.Storage type = getConcreteStorageType(preActionBranchingKey);
+                    if (existingType == type) {
+                        logger.debug("[{}] action is configured for index [{}] in policy [{}] which is already mounted " +
+                            "as a searchable snapshot with the same repository [{}] and storage type [{}], skipping this action",
+                            SearchableSnapshotAction.NAME, index.getName(), policyName, repo, type);
+                        return true;
+                    }
+
+                    logger.debug("[{}] action is configured for index [{}] in policy [{}] which is already mounted " +
+                        "as a searchable snapshot in repository [{}], however, the storage type ([{}] vs [{}]) " +
+                        "differs, so a new index will be created",
+                        SearchableSnapshotAction.NAME, index.getName(), policyName, this.snapshotRepository, existingType, type);
+                    // Perform the searchable snapshot
+                    return false;
                 }
+                // Perform the searchable snapshot, as the index is not currently a searchable snapshot
                 return false;
             });
-        CheckNotDataStreamWriteIndexStep checkNoWriteIndexStep = new CheckNotDataStreamWriteIndexStep(checkNoWriteIndex,
-            waitForNoFollowerStepKey);
-        final WaitForNoFollowersStep waitForNoFollowersStep;
-        if (forceMergeIndex) {
-            waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, forceMergeStepKey, client);
-        } else {
-            waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, generateSnapshotNameKey, client);
-        }
+        CheckNotDataStreamWriteIndexStep checkNoWriteIndexStep =
+            new CheckNotDataStreamWriteIndexStep(checkNoWriteIndex, waitForNoFollowerStepKey);
+        WaitForNoFollowersStep waitForNoFollowersStep =
+            new WaitForNoFollowersStep(waitForNoFollowerStepKey, skipGeneratingSnapshotKey, client);
+
+        // When generating a snapshot, we either jump to the force merge step, or we skip the
+        // forcemerge and go straight to steps for creating the snapshot
+        StepKey keyForSnapshotGeneration = forceMergeIndex ? forceMergeStepKey : generateSnapshotNameKey;
+        // Branch, deciding whether there is an existing searchable snapshot snapshot that can be used for mounting the index
+        // (in which case, skip generating a new name and the snapshot cleanup), or if we need to generate a new snapshot
+        BranchingStep skipGeneratingSnapshotStep =
+            new BranchingStep(skipGeneratingSnapshotKey, keyForSnapshotGeneration, mountSnapshotKey, (index, clusterState) -> {
+                IndexMetadata indexMetadata = clusterState.getMetadata().index(index);
+                String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetadata.getSettings());
+                LifecycleExecutionState lifecycleExecutionState = LifecycleExecutionState.fromIndexMetadata(indexMetadata);
+                if (lifecycleExecutionState.getSnapshotName() == null) {
+                    // No name exists, so it must be generated
+                    logger.trace("no snapshot name for index [{}] in policy [{}] exists, so one will be generated",
+                        index.getName(), policyName);
+                    return false;
+                }
+
+                if (this.snapshotRepository.equals(lifecycleExecutionState.getSnapshotRepository()) == false) {
+                    // A different repository is being used
+                    // TODO: allow this behavior instead of throwing an exception
+                    throw new IllegalArgumentException("searchable snapshot indices may be converted only within the same repository");
+                }
+
+                // We can skip the generate, initial cleanup, and snapshot taking for this index, as we already have a generated snapshot.
+                // This will jump ahead directly to the "mount snapshot" step
+                logger.debug("an existing snapshot [{}] in repository [{}] (index name: [{}]) " +
+                        "will be used for mounting [{}] as a searchable snapshot",
+                    lifecycleExecutionState.getSnapshotName(), lifecycleExecutionState.getSnapshotRepository(),
+                    lifecycleExecutionState.getSnapshotIndexName(), index.getName());
+                return true;
+            });
+
+        // If a new snapshot is needed, these steps are executed
         ForceMergeStep forceMergeStep = new ForceMergeStep(forceMergeStepKey, waitForSegmentCountKey, client, 1);
         SegmentCountStep segmentCountStep = new SegmentCountStep(waitForSegmentCountKey, generateSnapshotNameKey, client, 1);
         GenerateSnapshotNameStep generateSnapshotNameStep = new GenerateSnapshotNameStep(generateSnapshotNameKey, cleanSnapshotKey,
@@ -135,16 +234,19 @@ public class SearchableSnapshotAction implements LifecycleAction {
         CleanupSnapshotStep cleanupSnapshotStep = new CleanupSnapshotStep(cleanSnapshotKey, createSnapshotKey, client);
         AsyncActionBranchingStep createSnapshotBranchingStep = new AsyncActionBranchingStep(
             new CreateSnapshotStep(createSnapshotKey, mountSnapshotKey, client), cleanSnapshotKey, client);
+
+        // Now mount the snapshot to create the new index, if the skipGeneratingSnapshotStep determined a snapshot already existed that
+        // can be used, it jumps directly here, skipping the snapshot generation steps above.
         MountSnapshotStep mountSnapshotStep = new MountSnapshotStep(mountSnapshotKey, waitForGreenRestoredIndexKey,
-            client, RESTORED_INDEX_PREFIX);
+            client, getRestoredIndexPrefix(mountSnapshotKey), getConcreteStorageType(mountSnapshotKey));
         WaitForIndexColorStep waitForGreenIndexHealthStep = new WaitForIndexColorStep(waitForGreenRestoredIndexKey,
-            copyMetadataKey, ClusterHealthStatus.GREEN, RESTORED_INDEX_PREFIX);
+            copyMetadataKey, ClusterHealthStatus.GREEN, getRestoredIndexPrefix(waitForGreenRestoredIndexKey));
         // a policy with only the cold phase will have a null "nextStepKey", hence the "null" nextStepKey passed in below when that's the
         // case
         CopyExecutionStateStep copyMetadataStep = new CopyExecutionStateStep(copyMetadataKey, copyLifecyclePolicySettingKey,
-            RESTORED_INDEX_PREFIX, nextStepKey != null ? nextStepKey.getName() : "null");
+            getRestoredIndexPrefix(copyMetadataKey), nextStepKey != null ? nextStepKey.getName() : "null");
         CopySettingsStep copySettingsStep = new CopySettingsStep(copyLifecyclePolicySettingKey, dataStreamCheckBranchingKey,
-            RESTORED_INDEX_PREFIX, LifecycleSettings.LIFECYCLE_NAME);
+            getRestoredIndexPrefix(copyLifecyclePolicySettingKey), LifecycleSettings.LIFECYCLE_NAME);
         BranchingStep isDataStreamBranchingStep = new BranchingStep(dataStreamCheckBranchingKey, swapAliasesKey, replaceDataStreamIndexKey,
             (index, clusterState) -> {
                 IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName());
@@ -152,17 +254,18 @@ public class SearchableSnapshotAction implements LifecycleAction {
                 return indexAbstraction.getParentDataStream() != null;
             });
         ReplaceDataStreamBackingIndexStep replaceDataStreamBackingIndex = new ReplaceDataStreamBackingIndexStep(replaceDataStreamIndexKey,
-            deleteIndexKey, RESTORED_INDEX_PREFIX);
+            deleteIndexKey, getRestoredIndexPrefix(replaceDataStreamIndexKey));
         DeleteStep deleteSourceIndexStep = new DeleteStep(deleteIndexKey, null, client);
         // sending this step to null as the restored index (which will after this step essentially be the source index) was sent to the next
         // key after we restored the lifecycle execution state
         SwapAliasesAndDeleteSourceIndexStep swapAliasesAndDeleteSourceIndexStep = new SwapAliasesAndDeleteSourceIndexStep(swapAliasesKey,
-            null, client, RESTORED_INDEX_PREFIX);
+            null, client, getRestoredIndexPrefix(swapAliasesKey));
 
         List<Step> steps = new ArrayList<>();
         steps.add(conditionalSkipActionStep);
         steps.add(checkNoWriteIndexStep);
         steps.add(waitForNoFollowersStep);
+        steps.add(skipGeneratingSnapshotStep);
         if (forceMergeIndex) {
             steps.add(forceMergeStep);
             steps.add(segmentCountStep);
@@ -181,6 +284,39 @@ public class SearchableSnapshotAction implements LifecycleAction {
         return steps;
     }
 
+    /**
+     * Resolves the prefix to be used for the mounted index depending on the provided key
+     */
+    String getRestoredIndexPrefix(StepKey currentKey) {
+        if (storageType == null) {
+            if (currentKey.getPhase().equals(TimeseriesLifecycleType.FROZEN_PHASE)) {
+                return PARTIAL_RESTORED_INDEX_PREFIX;
+            } else {
+                return FULL_RESTORED_INDEX_PREFIX;
+            }
+        }
+        switch (storageType) {
+            case FULL_COPY:
+                return FULL_RESTORED_INDEX_PREFIX;
+            case SHARED_CACHE:
+                return PARTIAL_RESTORED_INDEX_PREFIX;
+            default:
+                throw new IllegalArgumentException("unexpected storage type: " + storageType);
+        }
+    }
+
+    // Resolves the storage type from a Nullable to non-Nullable type
+    MountSearchableSnapshotRequest.Storage getConcreteStorageType(StepKey currentKey) {
+        if (storageType != null) {
+            return storageType;
+        }
+        if (currentKey.getPhase().equals(TimeseriesLifecycleType.FROZEN_PHASE)) {
+            return MountSearchableSnapshotRequest.Storage.SHARED_CACHE;
+        } else {
+            return MountSearchableSnapshotRequest.Storage.FULL_COPY;
+        }
+    }
+
     @Override
     public boolean isSafeAction() {
         return true;
@@ -197,6 +333,9 @@ public class SearchableSnapshotAction implements LifecycleAction {
         if (out.getVersion().onOrAfter(Version.V_7_10_0)) {
             out.writeBoolean(forceMergeIndex);
         }
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeOptionalEnum(storageType);
+        }
     }
 
     @Override
@@ -204,6 +343,9 @@ public class SearchableSnapshotAction implements LifecycleAction {
         builder.startObject();
         builder.field(SNAPSHOT_REPOSITORY.getPreferredName(), snapshotRepository);
         builder.field(FORCE_MERGE_INDEX.getPreferredName(), forceMergeIndex);
+        if (storageType != null) {
+            builder.field(STORAGE.getPreferredName(), storageType);
+        }
         builder.endObject();
         return builder;
     }
@@ -217,11 +359,12 @@ public class SearchableSnapshotAction implements LifecycleAction {
             return false;
         }
         SearchableSnapshotAction that = (SearchableSnapshotAction) o;
-        return Objects.equals(snapshotRepository, that.snapshotRepository);
+        return Objects.equals(snapshotRepository, that.snapshotRepository) &&
+            Objects.equals(storageType, that.storageType);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(snapshotRepository);
+        return Objects.hash(snapshotRepository, storageType);
     }
 }

+ 16 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/MountSearchableSnapshotRequest.java

@@ -234,6 +234,22 @@ public class MountSearchableSnapshotRequest extends MasterNodeRequest<MountSearc
         FULL_COPY,
         SHARED_CACHE;
 
+        public static Storage fromString(String type) {
+            if ("full_copy".equals(type)) {
+                return FULL_COPY;
+            } else if ("shared_cache".equals(type)) {
+                return SHARED_CACHE;
+            } else {
+                throw new IllegalArgumentException("unknown searchable snapshot storage type [" + type + "], valid types are: " +
+                    Strings.arrayToCommaDelimitedString(Storage.values()));
+            }
+        }
+
+        @Override
+        public String toString() {
+            return super.toString().toLowerCase(Locale.ROOT);
+        }
+
         public static Storage readFromStream(StreamInput in) throws IOException {
             return in.readEnum(Storage.class);
         }

+ 2 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecycleExecutionStateTests.java

@@ -195,6 +195,7 @@ public class LifecycleExecutionStateTests extends ESTestCase {
         String phaseDefinition = randomAlphaOfLengthBetween(15, 50);
         String repositoryName = randomAlphaOfLengthBetween(10, 20);
         String snapshotName = randomAlphaOfLengthBetween(10, 20);
+        String snapshotIndexName = randomAlphaOfLengthBetween(10, 20);
         long indexCreationDate = randomLong();
         long phaseTime = randomLong();
         long actionTime = randomLong();
@@ -213,6 +214,7 @@ public class LifecycleExecutionStateTests extends ESTestCase {
         customMetadata.put("step_time", String.valueOf(stepTime));
         customMetadata.put("snapshot_repository", repositoryName);
         customMetadata.put("snapshot_name", snapshotName);
+        customMetadata.put("snapshot_index_name", snapshotIndexName);
         return customMetadata;
     }
 }

+ 102 - 9
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStepTests.java

@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.elasticsearch.xpack.core.ilm.AbstractStepMasterTimeoutTestCase.emptyClusterState;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
@@ -39,12 +40,22 @@ public class MountSnapshotStepTests extends AbstractStepTestCase<MountSnapshotSt
         StepKey stepKey = randomStepKey();
         StepKey nextStepKey = randomStepKey();
         String restoredIndexPrefix = randomAlphaOfLength(10);
-        return new MountSnapshotStep(stepKey, nextStepKey, client, restoredIndexPrefix);
+        MountSearchableSnapshotRequest.Storage storage = randomStorageType();
+        return new MountSnapshotStep(stepKey, nextStepKey, client, restoredIndexPrefix, storage);
+    }
+
+    public static MountSearchableSnapshotRequest.Storage randomStorageType() {
+        if (randomBoolean()) {
+            return MountSearchableSnapshotRequest.Storage.FULL_COPY;
+        } else {
+            return MountSearchableSnapshotRequest.Storage.SHARED_CACHE;
+        }
     }
 
     @Override
     protected MountSnapshotStep copyInstance(MountSnapshotStep instance) {
-        return new MountSnapshotStep(instance.getKey(), instance.getNextStepKey(), instance.getClient(), instance.getRestoredIndexPrefix());
+        return new MountSnapshotStep(instance.getKey(), instance.getNextStepKey(), instance.getClient(), instance.getRestoredIndexPrefix(),
+            instance.getStorage());
     }
 
     @Override
@@ -52,7 +63,8 @@ public class MountSnapshotStepTests extends AbstractStepTestCase<MountSnapshotSt
         StepKey key = instance.getKey();
         StepKey nextKey = instance.getNextStepKey();
         String restoredIndexPrefix = instance.getRestoredIndexPrefix();
-        switch (between(0, 2)) {
+        MountSearchableSnapshotRequest.Storage storage = instance.getStorage();
+        switch (between(0, 3)) {
             case 0:
                 key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
                 break;
@@ -62,10 +74,19 @@ public class MountSnapshotStepTests extends AbstractStepTestCase<MountSnapshotSt
             case 2:
                 restoredIndexPrefix = randomValueOtherThan(restoredIndexPrefix, () -> randomAlphaOfLengthBetween(1, 10));
                 break;
+            case 3:
+                if (storage == MountSearchableSnapshotRequest.Storage.FULL_COPY) {
+                    storage = MountSearchableSnapshotRequest.Storage.SHARED_CACHE;
+                } else if (storage == MountSearchableSnapshotRequest.Storage.SHARED_CACHE) {
+                    storage = MountSearchableSnapshotRequest.Storage.FULL_COPY;
+                } else {
+                    throw new AssertionError("unknown storage type: " + storage);
+                }
+                break;
             default:
                 throw new AssertionError("Illegal randomisation branch");
         }
-        return new MountSnapshotStep(key, nextKey, instance.getClient(), restoredIndexPrefix);
+        return new MountSnapshotStep(key, nextKey, instance.getClient(), restoredIndexPrefix, storage);
     }
 
     public void testPerformActionFailure() {
@@ -145,8 +166,10 @@ public class MountSnapshotStepTests extends AbstractStepTestCase<MountSnapshotSt
         ClusterState clusterState =
             ClusterState.builder(emptyClusterState()).metadata(Metadata.builder().put(indexMetadata, true).build()).build();
 
-        try (NoOpClient client = getRestoreSnapshotRequestAssertingClient(repository, snapshotName, indexName, RESTORED_INDEX_PREFIX)) {
-            MountSnapshotStep step = new MountSnapshotStep(randomStepKey(), randomStepKey(), client, RESTORED_INDEX_PREFIX);
+        try (NoOpClient client =
+                 getRestoreSnapshotRequestAssertingClient(repository, snapshotName, indexName, RESTORED_INDEX_PREFIX, indexName)) {
+            MountSnapshotStep step =
+                new MountSnapshotStep(randomStepKey(), randomStepKey(), client, RESTORED_INDEX_PREFIX, randomStorageType());
             step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
                 @Override
                 public void onResponse(boolean complete) {
@@ -183,7 +206,7 @@ public class MountSnapshotStepTests extends AbstractStepTestCase<MountSnapshotSt
             RestoreSnapshotResponse responseWithOKStatus = new RestoreSnapshotResponse(new RestoreInfo("test", List.of(), 1, 1));
             try (NoOpClient clientPropagatingOKResponse = getClientTriggeringResponse(responseWithOKStatus)) {
                 MountSnapshotStep step = new MountSnapshotStep(randomStepKey(), randomStepKey(), clientPropagatingOKResponse,
-                    RESTORED_INDEX_PREFIX);
+                    RESTORED_INDEX_PREFIX, randomStorageType());
                 step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
                     @Override
                     public void onResponse(boolean complete) {
@@ -202,7 +225,76 @@ public class MountSnapshotStepTests extends AbstractStepTestCase<MountSnapshotSt
             RestoreSnapshotResponse responseWithACCEPTEDStatus = new RestoreSnapshotResponse((RestoreInfo) null);
             try (NoOpClient clientPropagatingACCEPTEDResponse = getClientTriggeringResponse(responseWithACCEPTEDStatus)) {
                 MountSnapshotStep step = new MountSnapshotStep(randomStepKey(), randomStepKey(), clientPropagatingACCEPTEDResponse,
-                    RESTORED_INDEX_PREFIX);
+                    RESTORED_INDEX_PREFIX, randomStorageType());
+                step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
+                    @Override
+                    public void onResponse(boolean complete) {
+                        assertThat(complete, is(true));
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        fail("expecting successful response but got: [" + e.getMessage() + "]");
+                    }
+                });
+            }
+        }
+    }
+
+    public void testBestEffortNameResolution() {
+        assertThat(MountSnapshotStep.bestEffortIndexNameResolution("potato"), equalTo("potato"));
+        assertThat(MountSnapshotStep.bestEffortIndexNameResolution("restored-potato"), equalTo("potato"));
+        assertThat(MountSnapshotStep.bestEffortIndexNameResolution("partial-potato"), equalTo("potato"));
+        assertThat(MountSnapshotStep.bestEffortIndexNameResolution("partial-restored-potato"), equalTo("potato"));
+        assertThat(MountSnapshotStep.bestEffortIndexNameResolution("restored-partial-potato"), equalTo("partial-potato"));
+        assertThat(MountSnapshotStep.bestEffortIndexNameResolution("my-restored-potato"), equalTo("my-restored-potato"));
+        assertThat(MountSnapshotStep.bestEffortIndexNameResolution("my-partial-potato"), equalTo("my-partial-potato"));
+        assertThat(MountSnapshotStep.bestEffortIndexNameResolution("my-partial-restored-potato"), equalTo("my-partial-restored-potato"));
+        assertThat(MountSnapshotStep.bestEffortIndexNameResolution("my-restored-partial-potato"), equalTo("my-restored-partial-potato"));
+    }
+
+    public void testMountWithNoPrefix() {
+        doTestMountWithoutSnapshotIndexNameInState("");
+    }
+
+    public void testMountWithRestorePrefix() {
+        doTestMountWithoutSnapshotIndexNameInState(SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX);
+    }
+
+    public void testMountWithPartialPrefix() {
+        doTestMountWithoutSnapshotIndexNameInState(SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX);
+    }
+
+    public void testMountWithPartialAndRestoredPrefix() {
+        doTestMountWithoutSnapshotIndexNameInState(SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX +
+            SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX);
+    }
+
+    public void doTestMountWithoutSnapshotIndexNameInState(String prefix) {
+        {
+            String indexNameSnippet = randomAlphaOfLength(10);
+            String indexName = prefix + indexNameSnippet;
+            String policyName = "test-ilm-policy";
+            Map<String, String> ilmCustom = new HashMap<>();
+            String snapshotName = indexName + "-" + policyName;
+            ilmCustom.put("snapshot_name", snapshotName);
+            String repository = "repository";
+            ilmCustom.put("snapshot_repository", repository);
+
+            IndexMetadata.Builder indexMetadataBuilder =
+                IndexMetadata.builder(indexName).settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
+                    .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, ilmCustom)
+                    .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5));
+            IndexMetadata indexMetadata = indexMetadataBuilder.build();
+
+            ClusterState clusterState =
+                ClusterState.builder(emptyClusterState()).metadata(Metadata.builder().put(indexMetadata, true).build()).build();
+
+            try (NoOpClient client =
+                     getRestoreSnapshotRequestAssertingClient(repository, snapshotName,
+                         indexName, RESTORED_INDEX_PREFIX, indexNameSnippet)) {
+                MountSnapshotStep step =
+                    new MountSnapshotStep(randomStepKey(), randomStepKey(), client, RESTORED_INDEX_PREFIX, randomStorageType());
                 step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
                     @Override
                     public void onResponse(boolean complete) {
@@ -232,7 +324,7 @@ public class MountSnapshotStepTests extends AbstractStepTestCase<MountSnapshotSt
 
     @SuppressWarnings("unchecked")
     private NoOpClient getRestoreSnapshotRequestAssertingClient(String expectedRepoName, String expectedSnapshotName, String indexName,
-                                                                String restoredIndexPrefix) {
+                                                                String restoredIndexPrefix, String expectedSnapshotIndexName) {
         return new NoOpClient(getTestName()) {
             @Override
             protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action,
@@ -248,6 +340,7 @@ public class MountSnapshotStepTests extends AbstractStepTestCase<MountSnapshotSt
                 assertThat(mountSearchableSnapshotRequest.ignoreIndexSettings(), is(notNullValue()));
                 assertThat(mountSearchableSnapshotRequest.ignoreIndexSettings()[0], is(LifecycleSettings.LIFECYCLE_NAME));
                 assertThat(mountSearchableSnapshotRequest.mountedIndexName(), is(restoredIndexPrefix + indexName));
+                assertThat(mountSearchableSnapshotRequest.snapshotIndexName(), is(expectedSnapshotIndexName));
             }
         };
     }

+ 54 - 7
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java

@@ -6,14 +6,17 @@
  */
 package org.elasticsearch.xpack.core.ilm;
 
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
+import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
 
 import java.io.IOException;
 import java.util.List;
 
 import static org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction.NAME;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 
 public class SearchableSnapshotActionTests extends AbstractActionTestCase<SearchableSnapshotAction> {
@@ -25,7 +28,7 @@ public class SearchableSnapshotActionTests extends AbstractActionTestCase<Search
         StepKey nextStepKey = new StepKey(phase, randomAlphaOfLengthBetween(1, 5), randomAlphaOfLengthBetween(1, 5));
 
         List<Step> steps = action.toSteps(null, phase, nextStepKey);
-        assertThat(steps.size(), is(action.isForceMergeIndex() ? 16 : 14));
+        assertThat(steps.size(), is(action.isForceMergeIndex() ? 17 : 15));
 
         List<StepKey> expectedSteps = action.isForceMergeIndex() ? expectedStepKeysWithForceMerge(phase) :
             expectedStepKeysNoForceMerge(phase);
@@ -43,22 +46,51 @@ public class SearchableSnapshotActionTests extends AbstractActionTestCase<Search
         assertThat(steps.get(10).getKey(), is(expectedSteps.get(10)));
         assertThat(steps.get(11).getKey(), is(expectedSteps.get(11)));
         assertThat(steps.get(12).getKey(), is(expectedSteps.get(12)));
+        assertThat(steps.get(13).getKey(), is(expectedSteps.get(13)));
+        assertThat(steps.get(14).getKey(), is(expectedSteps.get(14)));
 
         if (action.isForceMergeIndex()) {
-            assertThat(steps.get(14).getKey(), is(expectedSteps.get(14)));
-            AsyncActionBranchingStep branchStep = (AsyncActionBranchingStep) steps.get(7);
-            assertThat(branchStep.getNextKeyOnIncompleteResponse(), is(expectedSteps.get(6)));
+            assertThat(steps.get(15).getKey(), is(expectedSteps.get(15)));
+            assertThat(steps.get(16).getKey(), is(expectedSteps.get(16)));
+            AsyncActionBranchingStep branchStep = (AsyncActionBranchingStep) steps.get(8);
+            assertThat(branchStep.getNextKeyOnIncompleteResponse(), is(expectedSteps.get(7)));
         } else {
-            AsyncActionBranchingStep branchStep = (AsyncActionBranchingStep) steps.get(5);
-            assertThat(branchStep.getNextKeyOnIncompleteResponse(), is(expectedSteps.get(4)));
+            AsyncActionBranchingStep branchStep = (AsyncActionBranchingStep) steps.get(6);
+            assertThat(branchStep.getNextKeyOnIncompleteResponse(), is(expectedSteps.get(5)));
         }
     }
 
+    public void testPrefixAndStorageTypeDefaults() {
+        SearchableSnapshotAction action = new SearchableSnapshotAction("repo", randomBoolean(), null);
+        StepKey nonFrozenKey = new StepKey(randomFrom("hot", "warm", "cold", "delete"), randomAlphaOfLength(5), randomAlphaOfLength(5));
+        StepKey frozenKey = new StepKey("frozen", randomAlphaOfLength(5), randomAlphaOfLength(5));
+
+        assertThat(action.getStorageType(), equalTo(null));
+        assertThat(action.getConcreteStorageType(nonFrozenKey), equalTo(MountSearchableSnapshotRequest.Storage.FULL_COPY));
+        assertThat(action.getRestoredIndexPrefix(nonFrozenKey), equalTo(SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX));
+
+        assertThat(action.getConcreteStorageType(frozenKey), equalTo(MountSearchableSnapshotRequest.Storage.SHARED_CACHE));
+        assertThat(action.getRestoredIndexPrefix(frozenKey), equalTo(SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX));
+
+        action = new SearchableSnapshotAction("repo", randomBoolean(), MountSearchableSnapshotRequest.Storage.FULL_COPY);
+        assertThat(action.getConcreteStorageType(nonFrozenKey), equalTo(MountSearchableSnapshotRequest.Storage.FULL_COPY));
+        assertThat(action.getRestoredIndexPrefix(nonFrozenKey), equalTo(SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX));
+        assertThat(action.getConcreteStorageType(frozenKey), equalTo(MountSearchableSnapshotRequest.Storage.FULL_COPY));
+        assertThat(action.getRestoredIndexPrefix(frozenKey), equalTo(SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX));
+
+        action = new SearchableSnapshotAction("repo", randomBoolean(), MountSearchableSnapshotRequest.Storage.SHARED_CACHE);
+        assertThat(action.getConcreteStorageType(nonFrozenKey), equalTo(MountSearchableSnapshotRequest.Storage.SHARED_CACHE));
+        assertThat(action.getRestoredIndexPrefix(nonFrozenKey), equalTo(SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX));
+        assertThat(action.getConcreteStorageType(frozenKey), equalTo(MountSearchableSnapshotRequest.Storage.SHARED_CACHE));
+        assertThat(action.getRestoredIndexPrefix(frozenKey), equalTo(SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX));
+    }
+
     private List<StepKey> expectedStepKeysWithForceMerge(String phase) {
         return List.of(
             new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_ACTION_STEP),
             new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME),
             new StepKey(phase, NAME, WaitForNoFollowersStep.NAME),
+            new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_GENERATE_AND_CLEAN),
             new StepKey(phase, NAME, ForceMergeStep.NAME),
             new StepKey(phase, NAME, SegmentCountStep.NAME),
             new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME),
@@ -79,6 +111,7 @@ public class SearchableSnapshotActionTests extends AbstractActionTestCase<Search
             new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_ACTION_STEP),
             new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME),
             new StepKey(phase, NAME, WaitForNoFollowersStep.NAME),
+            new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_GENERATE_AND_CLEAN),
             new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME),
             new StepKey(phase, NAME, CleanupSnapshotStep.NAME),
             new StepKey(phase, NAME, CreateSnapshotStep.NAME),
@@ -92,6 +125,20 @@ public class SearchableSnapshotActionTests extends AbstractActionTestCase<Search
             new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME));
     }
 
+    @Nullable
+    public static MountSearchableSnapshotRequest.Storage randomStorageType() {
+        if (randomBoolean()) {
+            // null is the same as a full copy, it just means it was not specified
+            if (randomBoolean()) {
+                return null;
+            } else {
+                return MountSearchableSnapshotRequest.Storage.FULL_COPY;
+            }
+        } else {
+            return MountSearchableSnapshotRequest.Storage.SHARED_CACHE;
+        }
+    }
+
     @Override
     protected SearchableSnapshotAction doParseInstance(XContentParser parser) throws IOException {
         return SearchableSnapshotAction.parse(parser);
@@ -113,6 +160,6 @@ public class SearchableSnapshotActionTests extends AbstractActionTestCase<Search
     }
 
     static SearchableSnapshotAction randomInstance() {
-        return new SearchableSnapshotAction(randomAlphaOfLengthBetween(5, 10), randomBoolean());
+        return new SearchableSnapshotAction(randomAlphaOfLengthBetween(5, 10), randomBoolean(), randomStorageType());
     }
 }

+ 4 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/StepKeyTests.java

@@ -16,6 +16,10 @@ public class StepKeyTests extends AbstractSerializingTestCase<StepKey> {
 
     @Override
     public StepKey createTestInstance() {
+        return randomStepKey();
+    }
+
+    public static StepKey randomStepKey() {
         return new StepKey(randomAlphaOfLength(10), randomAlphaOfLength(10), randomAlphaOfLength(10));
     }
 

+ 6 - 2
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java

@@ -200,8 +200,9 @@ public final class TimeSeriesRestDriver {
         client.performRequest(request);
     }
 
-    public static void createPolicy(RestClient client, String policyName, @Nullable Phase hotPhase, @Nullable Phase warmPhase,
-                                    @Nullable Phase coldPhase, @Nullable Phase deletePhase) throws IOException {
+    public static void createPolicy(RestClient client, String policyName, @Nullable Phase hotPhase,
+                                    @Nullable Phase warmPhase, @Nullable Phase coldPhase,
+                                    @Nullable Phase frozenPhase, @Nullable Phase deletePhase) throws IOException {
         if (hotPhase == null && warmPhase == null && coldPhase == null && deletePhase == null) {
             throw new IllegalArgumentException("specify at least one phase");
         }
@@ -215,6 +216,9 @@ public final class TimeSeriesRestDriver {
         if (coldPhase != null) {
             phases.put("cold", coldPhase);
         }
+        if (frozenPhase != null) {
+            phases.put("frozen", frozenPhase);
+        }
         if (deletePhase != null) {
             phases.put("delete", deletePhase);
         }

+ 5 - 3
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/LifecycleLicenseIT.java

@@ -40,6 +40,7 @@ import static org.elasticsearch.xpack.TimeSeriesRestDriver.createSnapshotRepo;
 import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex;
 import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
 import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition;
+import static org.elasticsearch.xpack.core.ilm.SearchableSnapshotActionTests.randomStorageType;
 import static org.hamcrest.CoreMatchers.containsStringIgnoringCase;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
@@ -70,7 +71,8 @@ public class LifecycleLicenseIT extends ESRestTestCase {
         checkCurrentLicenseIs("basic");
 
         ResponseException exception = expectThrows(ResponseException.class,
-            () -> createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo, true)));
+            () -> createNewSingletonPolicy(client(), policy, "cold",
+                new SearchableSnapshotAction(snapshotRepo, true, randomStorageType())));
         assertThat(EntityUtils.toString(exception.getResponse().getEntity()),
             containsStringIgnoringCase("policy [" + policy + "] defines the [" + SearchableSnapshotAction.NAME + "] action but the " +
                 "current license is non-compliant for [searchable-snapshots]"));
@@ -80,7 +82,7 @@ public class LifecycleLicenseIT extends ESRestTestCase {
     public void testSearchableSnapshotActionErrorsOnInvalidLicense() throws Exception {
         String snapshotRepo = randomAlphaOfLengthBetween(4, 10);
         createSnapshotRepo(client(), snapshotRepo, randomBoolean());
-        createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo, true));
+        createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo, true, null));
 
         createComposableTemplate(client(), "template-name", dataStream,
             new Template(Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy).build(), null, null));
@@ -114,7 +116,7 @@ public class LifecycleLicenseIT extends ESRestTestCase {
         putTrialLicense();
         checkCurrentLicenseIs("trial");
 
-        String restoredIndexName = SearchableSnapshotAction.RESTORED_INDEX_PREFIX + backingIndexName;
+        String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndexName;
         assertTrue(waitUntil(() -> {
             try {
                 return indexExists(restoredIndexName);

+ 1 - 1
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java

@@ -142,7 +142,7 @@ public class TimeSeriesDataStreamsIT extends ESRestTestCase {
         indexDocument(client(), dataStream, true);
 
         String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1);
-        String restoredIndexName = SearchableSnapshotAction.RESTORED_INDEX_PREFIX + backingIndexName;
+        String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndexName;
 
         assertBusy(() -> assertThat(
             "original index must wait in the " + CheckNotDataStreamWriteIndexStep.NAME + " until it is not the write index anymore",

+ 1 - 1
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java

@@ -1171,7 +1171,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
             randomBoolean());
 
         String[] snapshotName = new String[1];
-        String restoredIndexName = SearchableSnapshotAction.RESTORED_INDEX_PREFIX + this.index;
+        String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + this.index;
         assertTrue(waitUntil(() -> {
             try {
                 Map<String, Object> explainIndex = explainIndex(client(), index);

+ 226 - 13
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.ilm.actions;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.cluster.metadata.DataStream;
@@ -19,6 +20,9 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.test.client.NoOpClient;
 import org.elasticsearch.test.rest.ESRestTestCase;
 import org.elasticsearch.xpack.core.ilm.DeleteAction;
 import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
@@ -26,6 +30,7 @@ import org.elasticsearch.xpack.core.ilm.FreezeAction;
 import org.elasticsearch.xpack.core.ilm.LifecycleAction;
 import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
 import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
+import org.elasticsearch.xpack.core.ilm.MountSnapshotStep;
 import org.elasticsearch.xpack.core.ilm.Phase;
 import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
 import org.elasticsearch.xpack.core.ilm.RolloverAction;
@@ -33,10 +38,14 @@ import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
 import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
 import org.elasticsearch.xpack.core.ilm.ShrinkAction;
 import org.elasticsearch.xpack.core.ilm.Step;
+import org.elasticsearch.xpack.core.ilm.StepKeyTests;
+import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -52,6 +61,7 @@ import static org.elasticsearch.xpack.TimeSeriesRestDriver.getNumberOfSegments;
 import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
 import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
 import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 
@@ -65,7 +75,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
     public void refreshIndex() {
         dataStream = "logs-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
         policy = "policy-" + randomAlphaOfLength(5);
-        snapshotRepo = randomAlphaOfLengthBetween(4, 10);
+        snapshotRepo = randomAlphaOfLengthBetween(10, 20);
         logger.info("--> running [{}] with data stream [{}], snapshot repot [{}] and policy [{}]", getTestName(), dataStream,
             snapshotRepo, policy);
     }
@@ -77,7 +87,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
 
     public void testSearchableSnapshotAction() throws Exception {
         createSnapshotRepo(client(), snapshotRepo, randomBoolean());
-        createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo, true));
+        createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo, true, null));
 
         createComposableTemplate(client(), randomAlphaOfLengthBetween(5, 10).toLowerCase(), dataStream,
             new Template(Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy).build(), null, null));
@@ -88,7 +98,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
         rolloverMaxOneDocCondition(client(), dataStream);
 
         String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1L);
-        String restoredIndexName = SearchableSnapshotAction.RESTORED_INDEX_PREFIX + backingIndexName;
+        String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndexName;
         assertTrue(waitUntil(() -> {
             try {
                 return indexExists(restoredIndexName);
@@ -103,7 +113,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
 
     public void testSearchableSnapshotForceMergesIndexToOneSegment() throws Exception {
         createSnapshotRepo(client(), snapshotRepo, randomBoolean());
-        createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo, true));
+        createNewSingletonPolicy(client(), policy, "cold", new SearchableSnapshotAction(snapshotRepo, true, null));
 
         createComposableTemplate(client(), randomAlphaOfLengthBetween(5, 10).toLowerCase(), dataStream, new Template(null, null, null));
 
@@ -140,7 +150,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
             }
         }, 60, TimeUnit.SECONDS));
 
-        String restoredIndexName = SearchableSnapshotAction.RESTORED_INDEX_PREFIX + backingIndexName;
+        String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndexName;
         assertTrue(waitUntil(() -> {
             try {
                 return indexExists(restoredIndexName);
@@ -184,7 +194,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
 
         String[] snapshotName = new String[1];
         String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1L);
-        String restoredIndexName = SearchableSnapshotAction.RESTORED_INDEX_PREFIX + backingIndexName;
+        String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + backingIndexName;
         assertTrue(waitUntil(() -> {
             try {
                 Map<String, Object> explainIndex = explainIndex(client(), backingIndexName);
@@ -217,7 +227,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
                 new SearchableSnapshotAction(randomAlphaOfLengthBetween(4, 10)))),
             new Phase("warm", TimeValue.ZERO, Map.of(ForceMergeAction.NAME, new ForceMergeAction(1, null))),
             new Phase("cold", TimeValue.ZERO, Map.of(FreezeAction.NAME, new FreezeAction())),
-            null
+            null, null
             )
         );
 
@@ -231,7 +241,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
             new Phase("hot", TimeValue.ZERO, Map.of(RolloverAction.NAME, new RolloverAction(null, null, 1L), SearchableSnapshotAction.NAME,
                 new SearchableSnapshotAction(snapshotRepo))),
             new Phase("warm", TimeValue.timeValueDays(30), Map.of(SetPriorityAction.NAME, new SetPriorityAction(999))),
-            null, null
+            null, null, null
         );
 
         createComposableTemplate(client(), randomAlphaOfLengthBetween(5, 10).toLowerCase(), dataStream,
@@ -246,7 +256,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
             indexDocument(client(), dataStream, true);
         }
 
-        String restoredIndexName = SearchableSnapshotAction.RESTORED_INDEX_PREFIX + DataStream.getDefaultBackingIndexName(dataStream, 1L);
+        String restoredIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + DataStream.getDefaultBackingIndexName(dataStream, 1L);
         assertTrue(waitUntil(() -> {
             try {
                 return indexExists(restoredIndexName);
@@ -267,7 +277,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
                 Map.of(ShrinkAction.NAME, new ShrinkAction(1, null), ForceMergeAction.NAME, new ForceMergeAction(1, null))
             ),
             new Phase("cold", TimeValue.ZERO, Map.of(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo))),
-            null
+            null, null
         );
 
         // even though the index is now mounted as a searchable snapshot, the actions that can't operate on it should
@@ -286,7 +296,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
             new Phase("hot", TimeValue.ZERO, Map.of(RolloverAction.NAME, new RolloverAction(null, null, 1L),
                 SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo))),
             new Phase("warm", TimeValue.timeValueDays(30), Map.of(SetPriorityAction.NAME, new SetPriorityAction(999))),
-            null, null
+            null, null, null
         );
 
         createComposableTemplate(client(), randomAlphaOfLengthBetween(5, 10).toLowerCase(), dataStream,
@@ -301,7 +311,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
             indexDocument(client(), dataStream, true);
         }
 
-        String searchableSnapMountedIndexName = SearchableSnapshotAction.RESTORED_INDEX_PREFIX +
+        String searchableSnapMountedIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX +
             DataStream.getDefaultBackingIndexName(dataStream, 1L);
         assertTrue(waitUntil(() -> {
             try {
@@ -335,7 +345,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
                 Map.of(ShrinkAction.NAME, new ShrinkAction(1, null), ForceMergeAction.NAME, new ForceMergeAction(1, null))
             ),
             new Phase("cold", TimeValue.ZERO, Map.of(FreezeAction.NAME, new FreezeAction())),
-            null
+            null, null
         );
 
         // restore the datastream
@@ -353,4 +363,207 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
             assertThat(stepKeyForIndex.getName(), is(PhaseCompleteStep.NAME));
         }, 30, TimeUnit.SECONDS);
     }
+
+    @SuppressWarnings("unchecked")
+    public void testIdenticalSearchableSnapshotActionIsNoop() throws Exception {
+        String index = "myindex-" + randomAlphaOfLength(4).toLowerCase(Locale.ROOT);
+        createSnapshotRepo(client(), snapshotRepo, randomBoolean());
+        MountSearchableSnapshotRequest.Storage storage = randomBoolean() ?
+            MountSearchableSnapshotRequest.Storage.FULL_COPY : MountSearchableSnapshotRequest.Storage.SHARED_CACHE;
+        createPolicy(client(), policy, null, null,
+            new Phase("cold", TimeValue.ZERO,
+                singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean(), storage))),
+            new Phase("frozen", TimeValue.ZERO,
+                singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean(), storage))),
+            null
+        );
+
+        createIndex(index, Settings.builder()
+            .put(LifecycleSettings.LIFECYCLE_NAME, policy)
+            .build());
+        ensureGreen(index);
+
+        final String searchableSnapMountedIndexName = (storage == MountSearchableSnapshotRequest.Storage.FULL_COPY ?
+            SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX : SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX) + index;
+
+        assertBusy(() -> {
+            logger.info("--> waiting for [{}] to exist...", searchableSnapMountedIndexName);
+            assertTrue(indexExists(searchableSnapMountedIndexName));
+        }, 30, TimeUnit.SECONDS);
+
+        assertBusy(() -> {
+            Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName);
+            assertThat(stepKeyForIndex.getPhase(), is("frozen"));
+            assertThat(stepKeyForIndex.getName(), is(PhaseCompleteStep.NAME));
+        }, 30, TimeUnit.SECONDS);
+
+        Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all");
+        Response response = client().performRequest(getSnaps);
+        Map<String, Object> responseMap;
+        try (InputStream is = response.getEntity().getContent()) {
+            responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
+        }
+        assertThat("expected to have only one snapshot, but got: " + responseMap,
+            ((List<Map<String, Object>>)
+                ((Map<String, Object>)
+                    ((List<Object>) responseMap.get("responses")).get(0)).get("snapshots")).size(), equalTo(1));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testConvertingSearchableSnapshotFromFullToPartial() throws Exception {
+        String index = "myindex-" + randomAlphaOfLength(4).toLowerCase(Locale.ROOT);
+        createSnapshotRepo(client(), snapshotRepo, randomBoolean());
+        createPolicy(client(), policy, null, null,
+            new Phase("cold", TimeValue.ZERO,
+                singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean(),
+                    MountSearchableSnapshotRequest.Storage.FULL_COPY))),
+            new Phase("frozen", TimeValue.ZERO,
+                singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean(),
+                    MountSearchableSnapshotRequest.Storage.SHARED_CACHE))),
+            null
+        );
+
+        createIndex(index, Settings.builder()
+            .put(LifecycleSettings.LIFECYCLE_NAME, policy)
+            .build());
+        ensureGreen(index);
+        indexDocument(client(), index);
+
+        final String searchableSnapMountedIndexName = SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX +
+            SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + index;
+
+        assertBusy(() -> {
+            logger.info("--> waiting for [{}] to exist...", searchableSnapMountedIndexName);
+            assertTrue(indexExists(searchableSnapMountedIndexName));
+        }, 30, TimeUnit.SECONDS);
+
+        assertBusy(() -> {
+            Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName);
+            assertThat(stepKeyForIndex.getPhase(), is("frozen"));
+            assertThat(stepKeyForIndex.getName(), is(PhaseCompleteStep.NAME));
+        }, 30, TimeUnit.SECONDS);
+
+        Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all");
+        Response response = client().performRequest(getSnaps);
+        Map<String, Object> responseMap;
+        try (InputStream is = response.getEntity().getContent()) {
+            responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
+        }
+        assertThat("expected to have only one snapshot, but got: " + responseMap,
+            ((List<Map<String, Object>>)
+                ((Map<String, Object>)
+                    ((List<Object>) responseMap.get("responses")).get(0)).get("snapshots")).size(), equalTo(1));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testConvertingPartialSearchableSnapshotIntoFull() throws Exception {
+        String index = "myindex-" + randomAlphaOfLength(4).toLowerCase(Locale.ROOT);
+        createSnapshotRepo(client(), snapshotRepo, randomBoolean());
+        createPolicy(client(), policy, null, null,
+            new Phase("cold", TimeValue.ZERO,
+                singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean(),
+                    MountSearchableSnapshotRequest.Storage.SHARED_CACHE))),
+            new Phase("frozen", TimeValue.ZERO,
+                singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean(),
+                    MountSearchableSnapshotRequest.Storage.FULL_COPY))),
+            null
+        );
+
+        createIndex(index, Settings.builder()
+            .put(LifecycleSettings.LIFECYCLE_NAME, policy)
+            .build());
+        ensureGreen(index);
+        indexDocument(client(), index);
+
+        final String searchableSnapMountedIndexName = SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX +
+            SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX + index;
+
+        assertBusy(() -> {
+            logger.info("--> waiting for [{}] to exist...", searchableSnapMountedIndexName);
+            assertTrue(indexExists(searchableSnapMountedIndexName));
+        }, 30, TimeUnit.SECONDS);
+
+        assertBusy(() -> {
+            Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName);
+            assertThat(stepKeyForIndex.getPhase(), is("frozen"));
+            assertThat(stepKeyForIndex.getName(), is(PhaseCompleteStep.NAME));
+        }, 30, TimeUnit.SECONDS);
+
+        Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all");
+        Response response = client().performRequest(getSnaps);
+        Map<String, Object> responseMap;
+        try (InputStream is = response.getEntity().getContent()) {
+            responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
+        }
+        assertThat("expected to have only one snapshot, but got: " + responseMap,
+            ((List<Map<String, Object>>)
+                ((Map<String, Object>)
+                    ((List<Object>) responseMap.get("responses")).get(0)).get("snapshots")).size(), equalTo(1));
+    }
+
+    @SuppressWarnings("unchecked")
+    @AwaitsFix(bugUrl = "functionality not yet implemented")
+    public void testSecondSearchableSnapshotChangesRepo() throws Exception {
+        String index = "myindex-" + randomAlphaOfLength(4).toLowerCase(Locale.ROOT);
+        String secondRepo = randomAlphaOfLengthBetween(10, 20);
+        createSnapshotRepo(client(), snapshotRepo, randomBoolean());
+        createSnapshotRepo(client(), secondRepo, randomBoolean());
+        createPolicy(client(), policy, null, null,
+            new Phase("cold", TimeValue.ZERO,
+                singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo, randomBoolean(),
+                    MountSearchableSnapshotRequest.Storage.FULL_COPY))),
+            new Phase("frozen", TimeValue.ZERO,
+                singletonMap(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(secondRepo, randomBoolean(),
+                    MountSearchableSnapshotRequest.Storage.SHARED_CACHE))),
+            null
+        );
+
+        createIndex(index, Settings.builder()
+            .put(LifecycleSettings.LIFECYCLE_NAME, policy)
+            .build());
+        ensureGreen(index);
+        indexDocument(client(), index);
+
+        final String searchableSnapMountedIndexName = SearchableSnapshotAction.PARTIAL_RESTORED_INDEX_PREFIX +
+            SearchableSnapshotAction.FULL_RESTORED_INDEX_PREFIX + index;
+
+        assertBusy(() -> {
+            logger.info("--> waiting for [{}] to exist...", searchableSnapMountedIndexName);
+            assertTrue(indexExists(searchableSnapMountedIndexName));
+        }, 30, TimeUnit.SECONDS);
+
+        assertBusy(() -> {
+            Step.StepKey stepKeyForIndex = getStepKeyForIndex(client(), searchableSnapMountedIndexName);
+            assertThat(stepKeyForIndex.getPhase(), is("frozen"));
+            assertThat(stepKeyForIndex.getName(), is(PhaseCompleteStep.NAME));
+        }, 30, TimeUnit.SECONDS);
+
+        // Check first repo has exactly 1 snapshot
+        {
+            Request getSnaps = new Request("GET", "/_snapshot/" + snapshotRepo + "/_all");
+            Response response = client().performRequest(getSnaps);
+            Map<String, Object> responseMap;
+            try (InputStream is = response.getEntity().getContent()) {
+                responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
+            }
+            assertThat("expected to have only one snapshot, but got: " + responseMap,
+                ((List<Map<String, Object>>)
+                    ((Map<String, Object>)
+                        ((List<Object>) responseMap.get("responses")).get(0)).get("snapshots")).size(), equalTo(1));
+        }
+
+        // Check second repo has exactly 1 snapshot
+        {
+            Request getSnaps = new Request("GET", "/_snapshot/" + secondRepo + "/_all");
+            Response response = client().performRequest(getSnaps);
+            Map<String, Object> responseMap;
+            try (InputStream is = response.getEntity().getContent()) {
+                responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
+            }
+            assertThat("expected to have only one snapshot, but got: " + responseMap,
+                ((List<Map<String, Object>>)
+                    ((Map<String, Object>)
+                        ((List<Object>) responseMap.get("responses")).get(0)).get("snapshots")).size(), equalTo(1));
+        }
+    }
 }