Browse Source

Pagination and Sorting for Get Snapshots API (#73952)

Pagination and snapshots for get snapshots API, build on top of the current implementation to enable work that needs this API for testing. A follow-up will leverage the changes to make things more efficient via pagination.

Relates https://github.com/elastic/elasticsearch/pull/73570 which does part of the under-the-hood changes required to efficiently implement this API on the repository layer.
Armin Braun 4 years ago
parent
commit
c1e9590a69

+ 31 - 0
docs/reference/snapshot-restore/apis/get-snapshot-api.asciidoc

@@ -100,6 +100,37 @@ comprising the number of shards in the index, the total size of the index in
 bytes, and the maximum number of segments per shard in the index. Defaults to
 `false`, meaning that this information is omitted.
 
+`sort`::
+(Optional, string)
+Allows setting a sort order for the result. Defaults to `start_time`, i.e. sorting by snapshot start time stamp.
++
+.Valid values for `sort`
+[%collapsible%open]
+====
+`start_time`::
+  Sort snapshots by their start time stamp and break ties by snapshot name.
+
+`duration`::
+  Sort snapshots by their duration and break ties by snapshot name.
+
+`name`::
+  Sort snapshots by their name.
+
+`index_count`::
+  Sort snapshots by the number of indices they contain and break ties by snapshot name.
+====
+
+`size`::
+(Optional, integer)
+ Maximum number of snapshots to return. Defaults to `0` which means return all that match the request without limit.
+
+`order`::
+(Optional, string)
+Sort order. Valid values are `asc` for ascending and `desc` for descending order. Defaults to `asc`, meaning ascending order.
+
+NOTE: The pagination parameters `size`, `order`, and `sort` are not supported when using `verbose=false` and the sort order for
+requests with `verbose=false` is undefined.
+
 [role="child_attributes"]
 [[get-snapshot-api-response-body]]
 ==== {api-response-body-title}

+ 254 - 0
qa/smoke-test-http/src/test/java/org/elasticsearch/http/snapshots/RestGetSnapshotsIT.java

@@ -0,0 +1,254 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.http.snapshots;
+
+import org.apache.http.client.methods.HttpGet;
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.DeprecationHandler;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
+import org.elasticsearch.snapshots.SnapshotInfo;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase.assertSnapshotListSorted;
+import static org.hamcrest.Matchers.in;
+import static org.hamcrest.Matchers.is;
+
+// TODO: dry up duplication across this suite and org.elasticsearch.snapshots.GetSnapshotsIT more
+public class RestGetSnapshotsIT extends AbstractSnapshotRestTestCase {
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        return Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings))
+                .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that check by-timestamp order
+                .build();
+    }
+
+    public void testSortOrder() throws Exception {
+        final String repoName = "test-repo";
+        AbstractSnapshotIntegTestCase.createRepository(logger, repoName, "fs");
+        final List<String> snapshotNamesWithoutIndex =
+            AbstractSnapshotIntegTestCase.createNSnapshots(logger, repoName, randomIntBetween(3, 20));
+
+        createIndexWithContent("test-index");
+
+        final List<String> snapshotNamesWithIndex =
+            AbstractSnapshotIntegTestCase.createNSnapshots(logger, repoName, randomIntBetween(3, 20));
+
+        final Collection<String> allSnapshotNames = new HashSet<>(snapshotNamesWithIndex);
+        allSnapshotNames.addAll(snapshotNamesWithoutIndex);
+        doTestSortOrder(repoName, allSnapshotNames, SortOrder.ASC);
+        doTestSortOrder(repoName, allSnapshotNames, SortOrder.DESC);
+    }
+
+    private void doTestSortOrder(String repoName, Collection<String> allSnapshotNames, SortOrder order) throws IOException {
+        final List<SnapshotInfo> defaultSorting =
+            clusterAdmin().prepareGetSnapshots(repoName).setOrder(order).get().getSnapshots(repoName);
+        assertSnapshotListSorted(defaultSorting, null, order);
+        assertSnapshotListSorted(
+                allSnapshotsSorted(allSnapshotNames, repoName, GetSnapshotsRequest.SortBy.NAME, order),
+                GetSnapshotsRequest.SortBy.NAME,
+                order
+        );
+        assertSnapshotListSorted(
+                allSnapshotsSorted(allSnapshotNames, repoName, GetSnapshotsRequest.SortBy.DURATION, order),
+                GetSnapshotsRequest.SortBy.DURATION,
+                order
+        );
+        assertSnapshotListSorted(
+                allSnapshotsSorted(allSnapshotNames, repoName, GetSnapshotsRequest.SortBy.INDICES, order),
+                GetSnapshotsRequest.SortBy.INDICES,
+                order
+        );
+        assertSnapshotListSorted(
+                allSnapshotsSorted(allSnapshotNames, repoName, GetSnapshotsRequest.SortBy.START_TIME, order),
+                GetSnapshotsRequest.SortBy.START_TIME,
+                order
+        );
+    }
+
+    public void testResponseSizeLimit() throws Exception {
+        final String repoName = "test-repo";
+        AbstractSnapshotIntegTestCase.createRepository(logger, repoName, "fs");
+        final List<String> names = AbstractSnapshotIntegTestCase.createNSnapshots(logger, repoName, randomIntBetween(6, 20));
+        for (GetSnapshotsRequest.SortBy sort : GetSnapshotsRequest.SortBy.values()) {
+            for (SortOrder order : SortOrder.values()) {
+                logger.info("--> testing pagination for [{}] [{}]", sort, order);
+                doTestPagination(repoName, names, sort, order);
+            }
+        }
+    }
+
+    private void doTestPagination(String repoName,
+                                  List<String> names,
+                                  GetSnapshotsRequest.SortBy sort,
+                                  SortOrder order) throws IOException {
+        final List<SnapshotInfo> allSnapshotsSorted = allSnapshotsSorted(names, repoName, sort, order);
+        final List<SnapshotInfo> batch1 = sortedWithLimit(repoName, sort, 2, order);
+        assertEquals(batch1, allSnapshotsSorted.subList(0, 2));
+        final List<SnapshotInfo> batch2 = sortedWithLimit(repoName, sort, batch1.get(1), 2, order);
+        assertEquals(batch2, allSnapshotsSorted.subList(2, 4));
+        final int lastBatch = names.size() - batch1.size() - batch2.size();
+        final List<SnapshotInfo> batch3 = sortedWithLimit(repoName, sort, batch2.get(1), lastBatch, order);
+        assertEquals(batch3, allSnapshotsSorted.subList(batch1.size() + batch2.size(), names.size()));
+        final List<SnapshotInfo> batch3NoLimit =
+            sortedWithLimit(repoName, sort, batch2.get(1), GetSnapshotsRequest.NO_LIMIT, order);
+        assertEquals(batch3, batch3NoLimit);
+        final List<SnapshotInfo> batch3LargeLimit = sortedWithLimit(
+                repoName,
+                sort,
+                batch2.get(1),
+                lastBatch + randomIntBetween(1, 100),
+                order
+        );
+        assertEquals(batch3, batch3LargeLimit);
+    }
+
+    public void testSortAndPaginateWithInProgress() throws Exception {
+        final String repoName = "test-repo";
+        AbstractSnapshotIntegTestCase.createRepository(logger, repoName, "mock");
+        final Collection<String> allSnapshotNames =
+                new HashSet<>(AbstractSnapshotIntegTestCase.createNSnapshots(logger, repoName, randomIntBetween(3, 20)));
+        createIndexWithContent("test-index-1");
+        allSnapshotNames.addAll(AbstractSnapshotIntegTestCase.createNSnapshots(logger, repoName, randomIntBetween(3, 20)));
+        createIndexWithContent("test-index-2");
+
+        final int inProgressCount = randomIntBetween(6, 20);
+        final List<ActionFuture<CreateSnapshotResponse>> inProgressSnapshots = new ArrayList<>(inProgressCount);
+        AbstractSnapshotIntegTestCase.blockAllDataNodes(repoName);
+        for (int i = 0; i < inProgressCount; i++) {
+            final String snapshotName = "snap-" + i;
+            allSnapshotNames.add(snapshotName);
+            inProgressSnapshots.add(AbstractSnapshotIntegTestCase.startFullSnapshot(logger, repoName, snapshotName, false));
+        }
+        AbstractSnapshotIntegTestCase.awaitNumberOfSnapshotsInProgress(logger, inProgressCount);
+
+        assertStablePagination(repoName, allSnapshotNames, GetSnapshotsRequest.SortBy.START_TIME);
+        assertStablePagination(repoName, allSnapshotNames, GetSnapshotsRequest.SortBy.NAME);
+        assertStablePagination(repoName, allSnapshotNames, GetSnapshotsRequest.SortBy.INDICES);
+
+        AbstractSnapshotIntegTestCase.unblockAllDataNodes(repoName);
+        for (ActionFuture<CreateSnapshotResponse> inProgressSnapshot : inProgressSnapshots) {
+            AbstractSnapshotIntegTestCase.assertSuccessful(logger, inProgressSnapshot);
+        }
+
+        assertStablePagination(repoName, allSnapshotNames, GetSnapshotsRequest.SortBy.START_TIME);
+        assertStablePagination(repoName, allSnapshotNames, GetSnapshotsRequest.SortBy.NAME);
+        assertStablePagination(repoName, allSnapshotNames, GetSnapshotsRequest.SortBy.INDICES);
+    }
+
+    private void createIndexWithContent(String indexName) {
+        logger.info("--> creating index [{}]", indexName);
+        createIndex(indexName, AbstractSnapshotIntegTestCase.SINGLE_SHARD_NO_REPLICA);
+        ensureGreen(indexName);
+        indexDoc(indexName, "some_id", "foo", "bar");
+    }
+
+    private static void assertStablePagination(String repoName,
+                                               Collection<String> allSnapshotNames,
+                                               GetSnapshotsRequest.SortBy sort) throws IOException {
+        final SortOrder order = randomFrom(SortOrder.values());
+        final List<SnapshotInfo> allSorted = allSnapshotsSorted(allSnapshotNames, repoName, sort, order);
+
+        for (int i = 1; i <= allSnapshotNames.size(); i++) {
+            final List<SnapshotInfo> subsetSorted = sortedWithLimit(repoName, sort, i, order);
+            assertEquals(subsetSorted, allSorted.subList(0, i));
+        }
+
+        for (int j = 0; j < allSnapshotNames.size(); j++) {
+            final SnapshotInfo after = allSorted.get(j);
+            for (int i = 1; i < allSnapshotNames.size() - j; i++) {
+                final List<SnapshotInfo> subsetSorted = sortedWithLimit(repoName, sort, after, i, order);
+                assertEquals(subsetSorted, allSorted.subList(j + 1, j + i + 1));
+            }
+        }
+    }
+
+    private static List<SnapshotInfo> allSnapshotsSorted(Collection<String> allSnapshotNames,
+                                                         String repoName,
+                                                         GetSnapshotsRequest.SortBy sortBy,
+                                                         SortOrder order) throws IOException {
+        final Request request = baseGetSnapshotsRequest(repoName);
+        request.addParameter("sort", sortBy.toString());
+        if (order == SortOrder.DESC || randomBoolean()) {
+            request.addParameter("order", order.toString());
+        }
+        final Response response = getRestClient().performRequest(request);
+        final List<SnapshotInfo> snapshotInfos = readSnapshotInfos(repoName, response);
+        assertEquals(snapshotInfos.size(), allSnapshotNames.size());
+        for (SnapshotInfo snapshotInfo : snapshotInfos) {
+            assertThat(snapshotInfo.snapshotId().getName(), is(in(allSnapshotNames)));
+        }
+        return snapshotInfos;
+    }
+
+    private static Request baseGetSnapshotsRequest(String repoName) {
+        return new Request(HttpGet.METHOD_NAME, "/_snapshot/" + repoName + "/*");
+    }
+
+    private static List<SnapshotInfo> sortedWithLimit(String repoName,
+                                                      GetSnapshotsRequest.SortBy sortBy,
+                                                      int size,
+                                                      SortOrder order) throws IOException {
+        final Request request = baseGetSnapshotsRequest(repoName);
+        request.addParameter("sort", sortBy.toString());
+        if (order == SortOrder.DESC || randomBoolean()) {
+            request.addParameter("order", order.toString());
+        }
+        request.addParameter("size", String.valueOf(size));
+        final Response response = getRestClient().performRequest(request);
+        return readSnapshotInfos(repoName, response);
+    }
+
+    private static List<SnapshotInfo> readSnapshotInfos(String repoName, Response response) throws IOException {
+        final List<SnapshotInfo> snapshotInfos;
+        try (InputStream input = response.getEntity().getContent();
+             XContentParser parser = JsonXContent.jsonXContent.createParser(
+                     NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, input)) {
+            snapshotInfos = GetSnapshotsResponse.fromXContent(parser).getSnapshots(repoName);
+        }
+        return snapshotInfos;
+    }
+
+    private static List<SnapshotInfo> sortedWithLimit(String repoName,
+                                                      GetSnapshotsRequest.SortBy sortBy,
+                                                      SnapshotInfo after,
+                                                      int size,
+                                                      SortOrder order) throws IOException {
+        final Request request = baseGetSnapshotsRequest(repoName);
+        request.addParameter("sort", sortBy.toString());
+        if (size != GetSnapshotsRequest.NO_LIMIT || randomBoolean()) {
+            request.addParameter("size", String.valueOf(size));
+        }
+        if (after != null) {
+            request.addParameter("after", GetSnapshotsRequest.After.from(after, sortBy).value() + "," + after.snapshotId().getName());
+        }
+        if (order == SortOrder.DESC || randomBoolean()) {
+            request.addParameter("order", order.toString());
+        }
+        final Response response = getRestClient().performRequest(request);
+        return readSnapshotInfos(repoName, response);
+    }
+}

+ 1 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

@@ -1056,6 +1056,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
         final ActionFuture<AcknowledgedResponse> deleteResponse = startDeleteSnapshot(repoName, snapshotName);
 
         awaitClusterState(
+            logger,
             otherDataNode,
             state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
                 .entries()

+ 231 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/GetSnapshotsIT.java

@@ -0,0 +1,231 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.snapshots;
+
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequestBuilder;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.hamcrest.Matchers.in;
+import static org.hamcrest.Matchers.is;
+
+public class GetSnapshotsIT extends AbstractSnapshotIntegTestCase {
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        return Settings.builder()
+            .put(super.nodeSettings(nodeOrdinal, otherSettings))
+            .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that check by-timestamp order
+            .build();
+    }
+
+    public void testSortBy() throws Exception {
+        final String repoName = "test-repo";
+        final Path repoPath = randomRepoPath();
+        createRepository(repoName, "fs", repoPath);
+        maybeInitWithOldSnapshotVersion(repoName, repoPath);
+        final List<String> snapshotNamesWithoutIndex = createNSnapshots(repoName, randomIntBetween(3, 20));
+
+        createIndexWithContent("test-index");
+
+        final List<String> snapshotNamesWithIndex = createNSnapshots(repoName, randomIntBetween(3, 20));
+
+        final Collection<String> allSnapshotNames = new HashSet<>(snapshotNamesWithIndex);
+        allSnapshotNames.addAll(snapshotNamesWithoutIndex);
+
+        doTestSortOrder(repoName, allSnapshotNames, SortOrder.ASC);
+        doTestSortOrder(repoName, allSnapshotNames, SortOrder.DESC);
+    }
+
+    private void doTestSortOrder(String repoName, Collection<String> allSnapshotNames, SortOrder order) {
+        final List<SnapshotInfo> defaultSorting = clusterAdmin().prepareGetSnapshots(repoName).setOrder(order).get().getSnapshots(repoName);
+        assertSnapshotListSorted(defaultSorting, null, order);
+        assertSnapshotListSorted(
+            allSnapshotsSorted(allSnapshotNames, repoName, GetSnapshotsRequest.SortBy.NAME, order),
+            GetSnapshotsRequest.SortBy.NAME,
+            order
+        );
+        assertSnapshotListSorted(
+            allSnapshotsSorted(allSnapshotNames, repoName, GetSnapshotsRequest.SortBy.DURATION, order),
+            GetSnapshotsRequest.SortBy.DURATION,
+            order
+        );
+        assertSnapshotListSorted(
+            allSnapshotsSorted(allSnapshotNames, repoName, GetSnapshotsRequest.SortBy.INDICES, order),
+            GetSnapshotsRequest.SortBy.INDICES,
+            order
+        );
+        assertSnapshotListSorted(
+            allSnapshotsSorted(allSnapshotNames, repoName, GetSnapshotsRequest.SortBy.START_TIME, order),
+            GetSnapshotsRequest.SortBy.START_TIME,
+            order
+        );
+    }
+
+    public void testResponseSizeLimit() throws Exception {
+        final String repoName = "test-repo";
+        final Path repoPath = randomRepoPath();
+        createRepository(repoName, "fs", repoPath);
+        maybeInitWithOldSnapshotVersion(repoName, repoPath);
+        final List<String> names = createNSnapshots(repoName, randomIntBetween(6, 20));
+        for (GetSnapshotsRequest.SortBy sort : GetSnapshotsRequest.SortBy.values()) {
+            for (SortOrder order : SortOrder.values()) {
+                logger.info("--> testing pagination for [{}] [{}]", sort, order);
+                doTestPagination(repoName, names, sort, order);
+            }
+        }
+    }
+
+    private void doTestPagination(String repoName, List<String> names, GetSnapshotsRequest.SortBy sort, SortOrder order) {
+        final List<SnapshotInfo> allSnapshotsSorted = allSnapshotsSorted(names, repoName, sort, order);
+        final List<SnapshotInfo> batch1 = sortedWithLimit(repoName, sort, null, 2, order);
+        assertEquals(batch1, allSnapshotsSorted.subList(0, 2));
+        final List<SnapshotInfo> batch2 = sortedWithLimit(repoName, sort, batch1.get(1), 2, order);
+        assertEquals(batch2, allSnapshotsSorted.subList(2, 4));
+        final int lastBatch = names.size() - batch1.size() - batch2.size();
+        final List<SnapshotInfo> batch3 = sortedWithLimit(repoName, sort, batch2.get(1), lastBatch, order);
+        assertEquals(batch3, allSnapshotsSorted.subList(batch1.size() + batch2.size(), names.size()));
+        final List<SnapshotInfo> batch3NoLimit = sortedWithLimit(repoName, sort, batch2.get(1), GetSnapshotsRequest.NO_LIMIT, order);
+        assertEquals(batch3, batch3NoLimit);
+        final List<SnapshotInfo> batch3LargeLimit = sortedWithLimit(
+            repoName,
+            sort,
+            batch2.get(1),
+            lastBatch + randomIntBetween(1, 100),
+            order
+        );
+        assertEquals(batch3, batch3LargeLimit);
+    }
+
+    public void testSortAndPaginateWithInProgress() throws Exception {
+        final String repoName = "test-repo";
+        final Path repoPath = randomRepoPath();
+        createRepository(repoName, "mock", repoPath);
+        maybeInitWithOldSnapshotVersion(repoName, repoPath);
+        final Collection<String> allSnapshotNames = new HashSet<>(createNSnapshots(repoName, randomIntBetween(3, 20)));
+        createIndexWithContent("test-index-1");
+        allSnapshotNames.addAll(createNSnapshots(repoName, randomIntBetween(3, 20)));
+        createIndexWithContent("test-index-2");
+
+        final int inProgressCount = randomIntBetween(6, 20);
+        final List<ActionFuture<CreateSnapshotResponse>> inProgressSnapshots = new ArrayList<>(inProgressCount);
+        blockAllDataNodes(repoName);
+        for (int i = 0; i < inProgressCount; i++) {
+            final String snapshotName = "snap-" + i;
+            allSnapshotNames.add(snapshotName);
+            inProgressSnapshots.add(startFullSnapshot(repoName, snapshotName));
+        }
+        awaitNumberOfSnapshotsInProgress(inProgressCount);
+
+        assertStablePagination(repoName, allSnapshotNames, GetSnapshotsRequest.SortBy.START_TIME);
+        assertStablePagination(repoName, allSnapshotNames, GetSnapshotsRequest.SortBy.NAME);
+        assertStablePagination(repoName, allSnapshotNames, GetSnapshotsRequest.SortBy.INDICES);
+
+        unblockAllDataNodes(repoName);
+        for (ActionFuture<CreateSnapshotResponse> inProgressSnapshot : inProgressSnapshots) {
+            assertSuccessful(inProgressSnapshot);
+        }
+
+        assertStablePagination(repoName, allSnapshotNames, GetSnapshotsRequest.SortBy.START_TIME);
+        assertStablePagination(repoName, allSnapshotNames, GetSnapshotsRequest.SortBy.NAME);
+        assertStablePagination(repoName, allSnapshotNames, GetSnapshotsRequest.SortBy.INDICES);
+    }
+
+    public void testPaginationRequiresVerboseListing() throws Exception {
+        final String repoName = "tst-repo";
+        createRepository(repoName, "fs");
+        createNSnapshots(repoName, randomIntBetween(1, 5));
+        expectThrows(
+            ActionRequestValidationException.class,
+            () -> clusterAdmin().prepareGetSnapshots(repoName)
+                .setVerbose(false)
+                .setSort(GetSnapshotsRequest.SortBy.DURATION)
+                .setSize(GetSnapshotsRequest.NO_LIMIT)
+                .execute()
+                .actionGet()
+        );
+        expectThrows(
+            ActionRequestValidationException.class,
+            () -> clusterAdmin().prepareGetSnapshots(repoName)
+                .setVerbose(false)
+                .setSort(GetSnapshotsRequest.SortBy.START_TIME)
+                .setSize(randomIntBetween(1, 100))
+                .execute()
+                .actionGet()
+        );
+    }
+
+    private static void assertStablePagination(String repoName, Collection<String> allSnapshotNames, GetSnapshotsRequest.SortBy sort) {
+        final SortOrder order = randomFrom(SortOrder.values());
+        final List<SnapshotInfo> allSorted = allSnapshotsSorted(allSnapshotNames, repoName, sort, order);
+
+        for (int i = 1; i <= allSnapshotNames.size(); i++) {
+            final List<SnapshotInfo> subsetSorted = sortedWithLimit(repoName, sort, null, i, order);
+            assertEquals(subsetSorted, allSorted.subList(0, i));
+        }
+
+        for (int j = 0; j < allSnapshotNames.size(); j++) {
+            final SnapshotInfo after = allSorted.get(j);
+            for (int i = 1; i < allSnapshotNames.size() - j; i++) {
+                final List<SnapshotInfo> subsetSorted = sortedWithLimit(repoName, sort, after, i, order);
+                assertEquals(subsetSorted, allSorted.subList(j + 1, j + i + 1));
+            }
+        }
+    }
+
+    private static List<SnapshotInfo> allSnapshotsSorted(
+        Collection<String> allSnapshotNames,
+        String repoName,
+        GetSnapshotsRequest.SortBy sortBy,
+        SortOrder order
+    ) {
+        final List<SnapshotInfo> snapshotInfos = sortedWithLimit(repoName, sortBy, null, GetSnapshotsRequest.NO_LIMIT, order);
+        assertEquals(snapshotInfos.size(), allSnapshotNames.size());
+        for (SnapshotInfo snapshotInfo : snapshotInfos) {
+            assertThat(snapshotInfo.snapshotId().getName(), is(in(allSnapshotNames)));
+        }
+        return snapshotInfos;
+    }
+
+    private static List<SnapshotInfo> sortedWithLimit(
+        String repoName,
+        GetSnapshotsRequest.SortBy sortBy,
+        SnapshotInfo after,
+        int size,
+        SortOrder order
+    ) {
+        return baseGetSnapshotsRequest(repoName).setAfter(after, sortBy).setSize(size).setOrder(order).get().getSnapshots(repoName);
+    }
+
+    private static GetSnapshotsRequestBuilder baseGetSnapshotsRequest(String repoName) {
+        final GetSnapshotsRequestBuilder builder = clusterAdmin().prepareGetSnapshots(repoName);
+        // exclude old version snapshot from test assertions every time and do a prefixed query in either case half the time
+        if (randomBoolean()
+            || clusterAdmin().prepareGetSnapshots(repoName)
+                .setSnapshots(AbstractSnapshotIntegTestCase.OLD_VERSION_SNAPSHOT_PREFIX + "*")
+                .setIgnoreUnavailable(true)
+                .get()
+                .getSnapshots(repoName)
+                .isEmpty() == false) {
+            builder.setSnapshots(RANDOM_SNAPSHOT_NAME_PREFIX + "*");
+        }
+        return builder;
+    }
+}

+ 176 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsRequest.java

@@ -14,6 +14,10 @@ import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
@@ -31,8 +35,25 @@ public class GetSnapshotsRequest extends MasterNodeRequest<GetSnapshotsRequest>
     public static final String ALL_SNAPSHOTS = "_all";
     public static final String CURRENT_SNAPSHOT = "_current";
     public static final boolean DEFAULT_VERBOSE_MODE = true;
+
     public static final Version MULTIPLE_REPOSITORIES_SUPPORT_ADDED = Version.V_8_0_0;
 
+    public static final Version PAGINATED_GET_SNAPSHOTS_VERSION = Version.V_8_0_0;
+
+    public static final int NO_LIMIT = -1;
+
+    /**
+     * Number of snapshots to fetch information for or {@link #NO_LIMIT} for fetching all snapshots matching the request.
+     */
+    private int size = NO_LIMIT;
+
+    @Nullable
+    private After after;
+
+    private SortBy sort = SortBy.START_TIME;
+
+    private SortOrder order = SortOrder.ASC;
+
     private String[] repositories;
 
     private String[] snapshots = Strings.EMPTY_ARRAY;
@@ -73,6 +94,12 @@ public class GetSnapshotsRequest extends MasterNodeRequest<GetSnapshotsRequest>
         snapshots = in.readStringArray();
         ignoreUnavailable = in.readBoolean();
         verbose = in.readBoolean();
+        if (in.getVersion().onOrAfter(PAGINATED_GET_SNAPSHOTS_VERSION)) {
+            after = in.readOptionalWriteable(After::new);
+            sort = in.readEnum(SortBy.class);
+            size = in.readVInt();
+            order = SortOrder.readFromStream(in);
+        }
     }
 
     @Override
@@ -93,6 +120,14 @@ public class GetSnapshotsRequest extends MasterNodeRequest<GetSnapshotsRequest>
         out.writeStringArray(snapshots);
         out.writeBoolean(ignoreUnavailable);
         out.writeBoolean(verbose);
+        if (out.getVersion().onOrAfter(PAGINATED_GET_SNAPSHOTS_VERSION)) {
+            out.writeOptionalWriteable(after);
+            out.writeEnum(sort);
+            out.writeVInt(size);
+            order.writeTo(out);
+        } else if (sort != SortBy.START_TIME || size != NO_LIMIT || after != null || order != SortOrder.ASC) {
+            throw new IllegalArgumentException("can't use paginated get snapshots request with node version [" + out.getVersion() + "]");
+        }
     }
 
     @Override
@@ -101,6 +136,23 @@ public class GetSnapshotsRequest extends MasterNodeRequest<GetSnapshotsRequest>
         if (repositories == null || repositories.length == 0) {
             validationException = addValidationError("repositories are missing", validationException);
         }
+        if (size == 0 || size < NO_LIMIT) {
+            validationException = addValidationError("size must be -1 or greater than 0", validationException);
+        }
+        if (verbose == false) {
+            if (sort != SortBy.START_TIME) {
+                validationException = addValidationError("can't use non-default sort with verbose=false", validationException);
+            }
+            if (size > 0) {
+                validationException = addValidationError("can't use size limit with verbose=false", validationException);
+            }
+            if (after != null) {
+                validationException = addValidationError("can't use after with verbose=false", validationException);
+            }
+            if (order != SortOrder.ASC) {
+                validationException = addValidationError("can't use non-default sort order with verbose=false", validationException);
+            }
+        }
         return validationException;
     }
 
@@ -174,6 +226,42 @@ public class GetSnapshotsRequest extends MasterNodeRequest<GetSnapshotsRequest>
         return this;
     }
 
+    public After after() {
+        return after;
+    }
+
+    public SortBy sort() {
+        return sort;
+    }
+
+    public GetSnapshotsRequest after(@Nullable After after) {
+        this.after = after;
+        return this;
+    }
+
+    public GetSnapshotsRequest sort(SortBy sort) {
+        this.sort = sort;
+        return this;
+    }
+
+    public GetSnapshotsRequest size(int size) {
+        this.size = size;
+        return this;
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public SortOrder order() {
+        return order;
+    }
+
+    public GetSnapshotsRequest order(SortOrder order) {
+        this.order = order;
+        return this;
+    }
+
     /**
      * Returns whether the request will return a verbose response.
      */
@@ -185,4 +273,92 @@ public class GetSnapshotsRequest extends MasterNodeRequest<GetSnapshotsRequest>
     public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
         return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers);
     }
+
+    public enum SortBy {
+        START_TIME("start_time"),
+        NAME("name"),
+        DURATION("duration"),
+        INDICES("index_count");
+
+        private final String param;
+
+        SortBy(String param) {
+            this.param = param;
+        }
+
+        @Override
+        public String toString() {
+            return param;
+        }
+
+        public static SortBy of(String value) {
+            switch (value) {
+                case "start_time":
+                    return START_TIME;
+                case "name":
+                    return NAME;
+                case "duration":
+                    return DURATION;
+                case "index_count":
+                    return INDICES;
+                default:
+                    throw new IllegalArgumentException("unknown sort order [" + value + "]");
+            }
+        }
+    }
+
+    public static final class After implements Writeable {
+
+        private final String value;
+
+        private final String snapshotName;
+
+        After(StreamInput in) throws IOException {
+            this(in.readString(), in.readString());
+        }
+
+        @Nullable
+        public static After from(@Nullable SnapshotInfo snapshotInfo, SortBy sortBy) {
+            if (snapshotInfo == null) {
+                return null;
+            }
+            final String afterValue;
+            switch (sortBy) {
+                case START_TIME:
+                    afterValue = String.valueOf(snapshotInfo.startTime());
+                    break;
+                case NAME:
+                    afterValue = snapshotInfo.snapshotId().getName();
+                    break;
+                case DURATION:
+                    afterValue = String.valueOf(snapshotInfo.endTime() - snapshotInfo.startTime());
+                    break;
+                case INDICES:
+                    afterValue = String.valueOf(snapshotInfo.indices().size());
+                    break;
+                default:
+                    throw new AssertionError("unknown sort column [" + sortBy + "]");
+            }
+            return new After(afterValue, snapshotInfo.snapshotId().getName());
+        }
+
+        public After(String value, String snapshotName) {
+            this.value = value;
+            this.snapshotName = snapshotName;
+        }
+
+        public String value() {
+            return value;
+        }
+
+        public String snapshotName() {
+            return snapshotName;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeString(value);
+            out.writeString(snapshotName);
+        }
+    }
 }

+ 27 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsRequestBuilder.java

@@ -11,6 +11,9 @@ package org.elasticsearch.action.admin.cluster.snapshots.get;
 import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
 import org.elasticsearch.client.ElasticsearchClient;
 import org.elasticsearch.common.util.ArrayUtils;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.snapshots.SnapshotInfo;
 
 /**
  * Get snapshots request builder
@@ -95,4 +98,28 @@ public class GetSnapshotsRequestBuilder extends MasterNodeOperationRequestBuilde
         return this;
     }
 
+    public GetSnapshotsRequestBuilder setAfter(@Nullable SnapshotInfo after, GetSnapshotsRequest.SortBy sortBy) {
+        return setAfter(GetSnapshotsRequest.After.from(after, sortBy)).setSort(sortBy);
+    }
+
+    public GetSnapshotsRequestBuilder setAfter(@Nullable GetSnapshotsRequest.After after) {
+        request.after(after);
+        return this;
+    }
+
+    public GetSnapshotsRequestBuilder setSort(GetSnapshotsRequest.SortBy sort) {
+        request.sort(sort);
+        return this;
+    }
+
+    public GetSnapshotsRequestBuilder setSize(int size) {
+        request.size(size);
+        return this;
+    }
+
+    public GetSnapshotsRequestBuilder setOrder(SortOrder order) {
+        request.order(order);
+        return this;
+    }
+
 }

+ 146 - 18
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java

@@ -34,6 +34,7 @@ import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.RepositoryMissingException;
+import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.snapshots.SnapshotMissingException;
@@ -47,14 +48,16 @@ import org.elasticsearch.transport.TransportService;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
+import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
-
-import static java.util.Collections.unmodifiableList;
+import java.util.stream.Stream;
 
 /**
  * Transport Action for get snapshots operation
@@ -109,6 +112,10 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
             request.ignoreUnavailable(),
             request.verbose(),
             (CancellableTask) task,
+            request.sort(),
+            request.after(),
+            request.size(),
+            request.order(),
             listener
         );
     }
@@ -120,6 +127,10 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
         boolean ignoreUnavailable,
         boolean verbose,
         CancellableTask cancellableTask,
+        GetSnapshotsRequest.SortBy sortBy,
+        @Nullable GetSnapshotsRequest.After after,
+        int size,
+        SortOrder order,
         ActionListener<GetSnapshotsResponse> listener
     ) {
         // short-circuit if there are no repos, because we can not create GroupedActionListener of size 0
@@ -144,6 +155,10 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
                 ignoreUnavailable,
                 verbose,
                 cancellableTask,
+                sortBy,
+                after,
+                size,
+                order,
                 groupedActionListener.delegateResponse((groupedListener, e) -> {
                     if (e instanceof ElasticsearchException) {
                         groupedListener.onResponse(GetSnapshotsResponse.Response.error(repoName, (ElasticsearchException) e));
@@ -162,11 +177,15 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
         boolean ignoreUnavailable,
         boolean verbose,
         CancellableTask task,
+        GetSnapshotsRequest.SortBy sortBy,
+        @Nullable final GetSnapshotsRequest.After after,
+        int size,
+        SortOrder order,
         ActionListener<List<SnapshotInfo>> listener
     ) {
         final Map<String, SnapshotId> allSnapshotIds = new HashMap<>();
         final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
-        for (SnapshotInfo snapshotInfo : sortedCurrentSnapshots(snapshotsInProgress, repo)) {
+        for (SnapshotInfo snapshotInfo : sortedCurrentSnapshots(snapshotsInProgress, repo, sortBy, after, size, order)) {
             SnapshotId snapshotId = snapshotInfo.snapshotId();
             allSnapshotIds.put(snapshotId.getName(), snapshotId);
             currentSnapshots.add(snapshotInfo);
@@ -190,6 +209,10 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
                 currentSnapshots,
                 repositoryData,
                 task,
+                sortBy,
+                after,
+                size,
+                order,
                 listener
             ),
             listener::onFailure
@@ -203,7 +226,14 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
      * @param repositoryName repository name
      * @return list of snapshots
      */
-    private static List<SnapshotInfo> sortedCurrentSnapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName) {
+    private static List<SnapshotInfo> sortedCurrentSnapshots(
+        SnapshotsInProgress snapshotsInProgress,
+        String repositoryName,
+        GetSnapshotsRequest.SortBy sortBy,
+        @Nullable final GetSnapshotsRequest.After after,
+        int size,
+        SortOrder order
+    ) {
         List<SnapshotInfo> snapshotList = new ArrayList<>();
         List<SnapshotsInProgress.Entry> entries = SnapshotsService.currentSnapshots(
             snapshotsInProgress,
@@ -213,8 +243,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
         for (SnapshotsInProgress.Entry entry : entries) {
             snapshotList.add(new SnapshotInfo(entry));
         }
-        CollectionUtil.timSort(snapshotList);
-        return unmodifiableList(snapshotList);
+        return sortSnapshots(snapshotList, sortBy, after, size, order);
     }
 
     private void loadSnapshotInfos(
@@ -227,6 +256,10 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
         List<SnapshotInfo> currentSnapshots,
         @Nullable RepositoryData repositoryData,
         CancellableTask task,
+        GetSnapshotsRequest.SortBy sortBy,
+        @Nullable final GetSnapshotsRequest.After after,
+        int size,
+        SortOrder order,
         ActionListener<List<SnapshotInfo>> listener
     ) {
         if (task.isCancelled()) {
@@ -268,16 +301,21 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
         }
 
         if (verbose) {
-            snapshots(snapshotsInProgress, repo, toResolve, ignoreUnavailable, task, listener);
+            snapshots(snapshotsInProgress, repo, toResolve, ignoreUnavailable, task, sortBy, after, size, order, listener);
         } else {
             final List<SnapshotInfo> snapshotInfos;
             if (repositoryData != null) {
                 // want non-current snapshots as well, which are found in the repository data
-                snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots);
+                snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots, sortBy, after, size, order);
             } else {
                 // only want current snapshots
-                snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList());
-                CollectionUtil.timSort(snapshotInfos);
+                snapshotInfos = sortSnapshots(
+                    currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList()),
+                    sortBy,
+                    after,
+                    size,
+                    order
+                );
             }
             listener.onResponse(snapshotInfos);
         }
@@ -285,12 +323,10 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
 
     /**
      * Returns a list of snapshots from repository sorted by snapshot creation date
-     *
-     * @param snapshotsInProgress snapshots in progress in the cluster state
+     *  @param snapshotsInProgress snapshots in progress in the cluster state
      * @param repositoryName      repository name
      * @param snapshotIds         snapshots for which to fetch snapshot information
      * @param ignoreUnavailable   if true, snapshots that could not be read will only be logged with a warning,
-     *                            if false, they will throw an error
      */
     private void snapshots(
         SnapshotsInProgress snapshotsInProgress,
@@ -298,6 +334,10 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
         Collection<SnapshotId> snapshotIds,
         boolean ignoreUnavailable,
         CancellableTask task,
+        GetSnapshotsRequest.SortBy sortBy,
+        @Nullable GetSnapshotsRequest.After after,
+        int size,
+        SortOrder order,
         ActionListener<List<SnapshotInfo>> listener
     ) {
         if (task.isCancelled()) {
@@ -327,8 +367,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
         final ActionListener<Void> allDoneListener = listener.delegateFailure((l, v) -> {
             final ArrayList<SnapshotInfo> snapshotList = new ArrayList<>(snapshotInfos);
             snapshotList.addAll(snapshotSet);
-            CollectionUtil.timSort(snapshotList);
-            listener.onResponse(unmodifiableList(snapshotList));
+            listener.onResponse(sortSnapshots(snapshotList, sortBy, after, size, order));
         });
         if (snapshotIdsToIterate.isEmpty()) {
             allDoneListener.onResponse(null);
@@ -374,7 +413,11 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
     private static List<SnapshotInfo> buildSimpleSnapshotInfos(
         final Set<SnapshotId> toResolve,
         final RepositoryData repositoryData,
-        final List<SnapshotInfo> currentSnapshots
+        final List<SnapshotInfo> currentSnapshots,
+        final GetSnapshotsRequest.SortBy sortBy,
+        @Nullable final GetSnapshotsRequest.After after,
+        final int size,
+        final SortOrder order
     ) {
         List<SnapshotInfo> snapshotInfos = new ArrayList<>();
         for (SnapshotInfo snapshotInfo : currentSnapshots) {
@@ -403,7 +446,92 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
                 )
             );
         }
-        CollectionUtil.timSort(snapshotInfos);
-        return Collections.unmodifiableList(snapshotInfos);
+        return sortSnapshots(snapshotInfos, sortBy, after, size, order);
+    }
+
+    private static final Comparator<SnapshotInfo> BY_START_TIME = Comparator.comparingLong(SnapshotInfo::startTime)
+        .thenComparing(SnapshotInfo::snapshotId);
+
+    private static final Comparator<SnapshotInfo> BY_DURATION = Comparator.<SnapshotInfo>comparingLong(
+        sni -> sni.endTime() - sni.startTime()
+    ).thenComparing(SnapshotInfo::snapshotId);
+
+    private static final Comparator<SnapshotInfo> BY_INDICES_COUNT = Comparator.<SnapshotInfo>comparingInt(sni -> sni.indices().size())
+        .thenComparing(SnapshotInfo::snapshotId);
+
+    private static final Comparator<SnapshotInfo> BY_NAME = Comparator.comparing(sni -> sni.snapshotId().getName());
+
+    private static List<SnapshotInfo> sortSnapshots(
+        List<SnapshotInfo> snapshotInfos,
+        GetSnapshotsRequest.SortBy sortBy,
+        @Nullable GetSnapshotsRequest.After after,
+        int size,
+        SortOrder order
+    ) {
+        final Comparator<SnapshotInfo> comparator;
+        switch (sortBy) {
+            case START_TIME:
+                comparator = BY_START_TIME;
+                break;
+            case NAME:
+                comparator = BY_NAME;
+                break;
+            case DURATION:
+                comparator = BY_DURATION;
+                break;
+            case INDICES:
+                comparator = BY_INDICES_COUNT;
+                break;
+            default:
+                throw new AssertionError("unexpected sort column [" + sortBy + "]");
+        }
+
+        Stream<SnapshotInfo> infos = snapshotInfos.stream();
+
+        if (after != null) {
+            final Predicate<SnapshotInfo> isAfter;
+            final String name = after.snapshotName();
+            switch (sortBy) {
+                case START_TIME:
+                    isAfter = filterByLongOffset(SnapshotInfo::startTime, Long.parseLong(after.value()), name, order);
+                    break;
+                case NAME:
+                    isAfter = order == SortOrder.ASC ? (info -> compareName(name, info) < 0) : (info -> compareName(name, info) > 0);
+                    break;
+                case DURATION:
+                    isAfter = filterByLongOffset(info -> info.endTime() - info.startTime(), Long.parseLong(after.value()), name, order);
+                    break;
+                case INDICES:
+                    isAfter = filterByLongOffset(info -> info.indices().size(), Integer.parseInt(after.value()), name, order);
+                    break;
+                default:
+                    throw new AssertionError("unexpected sort column [" + sortBy + "]");
+            }
+            infos = infos.filter(isAfter);
+        }
+        infos = infos.sorted(order == SortOrder.DESC ? comparator.reversed() : comparator);
+        if (size != GetSnapshotsRequest.NO_LIMIT) {
+            infos = infos.limit(size);
+        }
+        return infos.collect(Collectors.toUnmodifiableList());
+    }
+
+    private static Predicate<SnapshotInfo> filterByLongOffset(
+        ToLongFunction<SnapshotInfo> extractor,
+        long after,
+        String name,
+        SortOrder order
+    ) {
+        return order == SortOrder.ASC ? info -> {
+            final long val = extractor.applyAsLong(info);
+            return after < val || (after == val && compareName(name, info) < 0);
+        } : info -> {
+            final long val = extractor.applyAsLong(info);
+            return after > val || (after == val && compareName(name, info) > 0);
+        };
+    }
+
+    private static int compareName(String name, SnapshotInfo info) {
+        return name.compareTo(info.snapshotId().getName());
     }
 }

+ 18 - 0
server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestGetSnapshotsAction.java

@@ -15,6 +15,7 @@ import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.rest.action.RestCancellableNodeClient;
+import org.elasticsearch.search.sort.SortOrder;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -53,6 +54,23 @@ public class RestGetSnapshotsAction extends BaseRestHandler {
         GetSnapshotsRequest getSnapshotsRequest = getSnapshotsRequest(repositories).snapshots(snapshots);
         getSnapshotsRequest.ignoreUnavailable(request.paramAsBoolean("ignore_unavailable", getSnapshotsRequest.ignoreUnavailable()));
         getSnapshotsRequest.verbose(request.paramAsBoolean("verbose", getSnapshotsRequest.verbose()));
+        final GetSnapshotsRequest.SortBy sort = GetSnapshotsRequest.SortBy.of(request.param("sort", getSnapshotsRequest.sort().toString()));
+        getSnapshotsRequest.sort(sort);
+        final int size = request.paramAsInt("size", getSnapshotsRequest.size());
+        getSnapshotsRequest.size(size);
+        final String[] afterString = request.paramAsStringArray("after", Strings.EMPTY_ARRAY);
+        final GetSnapshotsRequest.After after;
+        if (afterString.length == 0) {
+            after = null;
+        } else if (afterString.length == 2) {
+            after = new GetSnapshotsRequest.After(afterString[0], afterString[1]);
+        } else {
+            throw new IllegalArgumentException("illegal ?after value [" + Strings.arrayToCommaDelimitedString(afterString) +
+                    "] must be of the form '${sort_value},${snapshot_name}'");
+        }
+        getSnapshotsRequest.after(after);
+        final SortOrder order = SortOrder.fromString(request.param("order", getSnapshotsRequest.order().toString()));
+        getSnapshotsRequest.order(order);
         getSnapshotsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getSnapshotsRequest.masterNodeTimeout()));
         return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin().cluster()
                 .getSnapshots(getSnapshotsRequest, new RestToXContentListener<>(channel));

+ 53 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsRequestTests.java

@@ -0,0 +1,53 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.action.admin.cluster.snapshots.get;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.containsString;
+
+public class GetSnapshotsRequestTests extends ESTestCase {
+
+    public void testValidateParameters() {
+        {
+            final GetSnapshotsRequest request = new GetSnapshotsRequest("repo", "snapshot");
+            assertNull(request.validate());
+            request.size(0);
+            final ActionRequestValidationException e = request.validate();
+            assertThat(e.getMessage(), containsString("size must be -1 or greater than 0"));
+        }
+        {
+            final GetSnapshotsRequest request = new GetSnapshotsRequest("repo", "snapshot").size(randomIntBetween(1, 500));
+            assertNull(request.validate());
+        }
+        {
+            final GetSnapshotsRequest request = new GetSnapshotsRequest("repo", "snapshot").verbose(false).size(randomIntBetween(1, 500));
+            final ActionRequestValidationException e = request.validate();
+            assertThat(e.getMessage(), containsString("can't use size limit with verbose=false"));
+        }
+        {
+            final GetSnapshotsRequest request = new GetSnapshotsRequest("repo", "snapshot").verbose(false)
+                .sort(GetSnapshotsRequest.SortBy.INDICES);
+            final ActionRequestValidationException e = request.validate();
+            assertThat(e.getMessage(), containsString("can't use non-default sort with verbose=false"));
+        }
+        {
+            final GetSnapshotsRequest request = new GetSnapshotsRequest("repo", "snapshot").verbose(false).order(SortOrder.DESC);
+            final ActionRequestValidationException e = request.validate();
+            assertThat(e.getMessage(), containsString("can't use non-default sort order with verbose=false"));
+        }
+        {
+            final GetSnapshotsRequest request = new GetSnapshotsRequest("repo", "snapshot").verbose(false)
+                .after(new GetSnapshotsRequest.After("foo", "bar"));
+            final ActionRequestValidationException e = request.validate();
+            assertThat(e.getMessage(), containsString("can't use after with verbose=false"));
+        }
+    }
+}

+ 75 - 7
test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.GroupedActionListener;
@@ -34,6 +35,7 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.DeprecationHandler;
@@ -51,6 +53,7 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
 import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.snapshots.mockstore.MockRepository;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.VersionUtils;
@@ -75,6 +78,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.StreamSupport;
@@ -84,12 +88,15 @@ import static org.elasticsearch.snapshots.SnapshotsService.NO_FEATURE_STATES_VAL
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 
 public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
 
-    private static final String OLD_VERSION_SNAPSHOT_PREFIX = "old-version-snapshot-";
+    public static final String RANDOM_SNAPSHOT_NAME_PREFIX = "snap-";
+
+    public static final String OLD_VERSION_SNAPSHOT_PREFIX = "old-version-snapshot-";
 
     // Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough
     // threads so that blocking some threads on one repository doesn't block other repositories from doing work
@@ -484,15 +491,24 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
 
     protected void awaitNoMoreRunningOperations(String viaNode) throws Exception {
         logger.info("--> verify no more operations in the cluster state");
-        awaitClusterState(viaNode, state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().isEmpty() &&
-                state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress() == false);
+        awaitClusterState(
+                logger,
+                viaNode,
+                state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().isEmpty()
+                    && state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY)
+                        .hasDeletionsInProgress() == false
+        );
     }
 
     protected void awaitClusterState(Predicate<ClusterState> statePredicate) throws Exception {
-        awaitClusterState(internalCluster().getMasterName(), statePredicate);
+        awaitClusterState(logger, internalCluster().getMasterName(), statePredicate);
     }
 
-    protected void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
+    public static void awaitClusterState(Logger logger, Predicate<ClusterState> statePredicate) throws Exception {
+        awaitClusterState(logger, internalCluster().getMasterName(), statePredicate);
+    }
+
+    public static void awaitClusterState(Logger logger, String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
         final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode);
         final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode);
         final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
@@ -531,6 +547,13 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
     }
 
     protected ActionFuture<CreateSnapshotResponse> startFullSnapshot(String repoName, String snapshotName, boolean partial) {
+        return startFullSnapshot(logger, repoName, snapshotName, partial);
+    }
+
+    public static ActionFuture<CreateSnapshotResponse> startFullSnapshot(Logger logger,
+                                                                         String repoName,
+                                                                         String snapshotName,
+                                                                         boolean partial) {
         logger.info("--> creating full snapshot [{}] to repo [{}]", snapshotName, repoName);
         return clusterAdmin().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true)
                 .setPartial(partial).execute();
@@ -542,14 +565,24 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
                 state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().size() == count);
     }
 
+    public static void awaitNumberOfSnapshotsInProgress(Logger logger, int count) throws Exception {
+        logger.info("--> wait for [{}] snapshots to show up in the cluster state", count);
+        awaitClusterState(logger, state ->
+                state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().size() == count);
+    }
+
     protected SnapshotInfo assertSuccessful(ActionFuture<CreateSnapshotResponse> future) throws Exception {
+        return assertSuccessful(logger, future);
+    }
+
+    public static SnapshotInfo assertSuccessful(Logger logger, ActionFuture<CreateSnapshotResponse> future) throws Exception {
         logger.info("--> wait for snapshot to finish");
         final SnapshotInfo snapshotInfo = future.get().getSnapshotInfo();
         assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
         return snapshotInfo;
     }
 
-    private static final Settings SINGLE_SHARD_NO_REPLICA = indexSettingsNoReplicas(1).build();
+    public static final Settings SINGLE_SHARD_NO_REPLICA = indexSettingsNoReplicas(1).build();
 
     protected void createIndexWithContent(String indexName) {
         createIndexWithContent(indexName, SINGLE_SHARD_NO_REPLICA);
@@ -617,7 +650,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
         final PlainActionFuture<Collection<CreateSnapshotResponse>> allSnapshotsDone = PlainActionFuture.newFuture();
         final ActionListener<CreateSnapshotResponse> snapshotsListener = new GroupedActionListener<>(allSnapshotsDone, count);
         final List<String> snapshotNames = new ArrayList<>(count);
-        final String prefix = "snap-" + UUIDs.randomBase64UUID(random()).toLowerCase(Locale.ROOT) + "-";
+        final String prefix = RANDOM_SNAPSHOT_NAME_PREFIX + UUIDs.randomBase64UUID(random()).toLowerCase(Locale.ROOT) + "-";
         for (int i = 0; i < count; i++) {
             final String snapshot = prefix + i;
             snapshotNames.add(snapshot);
@@ -640,4 +673,39 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
             }
         });
     }
+
+    public static void assertSnapshotListSorted(List<SnapshotInfo> snapshotInfos, @Nullable GetSnapshotsRequest.SortBy sort,
+                                                SortOrder sortOrder) {
+        final BiConsumer<SnapshotInfo, SnapshotInfo> assertion;
+        if (sort == null) {
+            assertion = (s1, s2) -> assertThat(s2, greaterThanOrEqualTo(s1));
+        } else {
+            switch (sort) {
+                case START_TIME:
+                    assertion = (s1, s2) -> assertThat(s2.startTime(), greaterThanOrEqualTo(s1.startTime()));
+                    break;
+                case NAME:
+                    assertion = (s1, s2) -> assertThat(s2.snapshotId().getName(), greaterThanOrEqualTo(s1.snapshotId().getName()));
+                    break;
+                case DURATION:
+                    assertion =
+                        (s1, s2) -> assertThat(s2.endTime() - s2.startTime(), greaterThanOrEqualTo(s1.endTime() - s1.startTime()));
+                    break;
+                case INDICES:
+                    assertion = (s1, s2) -> assertThat(s2.indices().size(), greaterThanOrEqualTo(s1.indices().size()));
+                    break;
+                default:
+                    throw new AssertionError("unknown sort column [" + sort + "]");
+            }
+        }
+        final BiConsumer<SnapshotInfo, SnapshotInfo> orderAssertion;
+        if (sortOrder == SortOrder.ASC) {
+            orderAssertion = assertion;
+        } else {
+            orderAssertion = (s1, s2) -> assertion.accept(s2, s1);
+        }
+        for (int i = 0; i < snapshotInfos.size() - 1; i++) {
+            orderAssertion.accept(snapshotInfos.get(i), snapshotInfos.get(i + 1));
+        }
+    }
 }