소스 검색

Add fleet polling API for global checkpoint (#71093)

Fleet server needs an API to access up to date global checkpoints for
indices. Additionally, it requires a mode of operation when fleet can
provide its current knowledge about the global checkpoints and poll for
advancements. This commit introduces this API in the fleet plugin.
Tim Brooks 4 년 전
부모
커밋
d5b96a35d0

+ 65 - 0
docs/reference/fleet/get-global-checkpoints.asciidoc

@@ -0,0 +1,65 @@
+[role="xpack"]
+[[get-global-checkpoints]]
+=== Get global checkpoints API
+++++
+<titleabbrev>Get global checkpoints</titleabbrev>
+++++
+
+The purpose of the get global checkpoints api is to return the current global
+checkpoints for an index. This API allows users to know the what sequence numbers
+have been safely persisted in Elasticsearch.
+
+The API has an optional polling mode enabled by the `wait_for_advance` query
+parameter. In polling mode, the API will only return after the global checkpoints
+advance past the provided `checkpoints`. By default, `checkpoints` is an empty
+array, which will lead to the API returning immediately.
+
+If a timeout occurs before the global checkpoints advance past the provided
+`checkpoints`, Elasticsearch will return the current global checkpoints and a
+boolean indicating that the request timed out.
+
+Currently the `wait_for_advance` parameter is only supported for one shard indices.
+
+[[get-global-checkpoints-api-request]]
+==== {api-request-title}
+
+`GET /<index>/_fleet/global_checkpoints`
+
+[[get-global-checkpoints-api-path-params]]
+==== {api-path-parms-title}
+
+`<index>`::
+(Required, string)
+A single index or index alias that resolves to a single index.
+
+[role="child_attributes"]
+[[get-global-checkpoints-api-query-parms]]
+==== {api-query-parms-title}
+
+`wait_for_advance`::
+(Optional, Boolean) A boolean value which controls whether to wait (until the
+`timeout`) for the global checkpoints to advance past the provided
+`checkpoints`.
+
+`checkpoints`::
+(Optional, list) A comma separated list of previous global checkpoints.
+When used in combination with `wait_for_advance`, the API will only return once
+the global checkpoints advances past the `checkpoints`. Defaults to an empty list
+which will cause Elasticsearch to immediately return the current global
+checkpoints.
+
+`timeout`::
+(Optional, <<time-units, time units>>)
+Period to wait for a global checkpoints to advance past `checkpoints`.
+Defaults to `30s`.
+
+[role="child_attributes"]
+[[get-global-checkpoints-api-response-body]]
+==== {api-response-body-title}
+
+`global_checkpoints`::
+(array of integers) The global checkpoints for the index.
+
+`timed_out`::
+(Boolean) If `false` the global checkpoints did not advance past the
+`checkpoints` within the specified `timeout`.

+ 13 - 0
docs/reference/fleet/index.asciidoc

@@ -0,0 +1,13 @@
+[role="xpack"]
+[[fleet-apis]]
+== Fleet APIs
+
+The following APIs are design to support fleet-server's usage of Elasticsearch as
+a datastore for internal agent and action data. These APIS are currently intended
+for internal use only and should be considered experimental.
+
+* <<get-global-checkpoints,Get global checkpoints>>
+
+// top-level
+include::get-global-checkpoints.asciidoc[]
+

+ 2 - 0
docs/reference/rest-api/index.asciidoc

@@ -21,6 +21,7 @@ not be included yet.
 * <<enrich-apis,Enrich APIs>>
 * <<graph-explore-api,Graph explore API>>
 * <<find-structure,Find structure API>>
+* <<fleet-apis,Fleet APIs>>
 * <<indices, Index APIs>>
 * <<index-lifecycle-management-api,Index lifecycle management APIs>>
 * <<ingest-apis,Ingest APIs>>
@@ -52,6 +53,7 @@ include::{es-repo-dir}/data-streams/data-stream-apis.asciidoc[]
 include::{es-repo-dir}/docs.asciidoc[]
 include::{es-repo-dir}/ingest/apis/enrich/index.asciidoc[]
 include::{es-repo-dir}/features/apis/features-apis.asciidoc[]
+include::{es-repo-dir}/fleet/index.asciidoc[]
 include::{es-repo-dir}/text-structure/apis/find-structure.asciidoc[leveloffset=+1]
 include::{es-repo-dir}/graph/explore.asciidoc[]
 include::{es-repo-dir}/indices.asciidoc[]

+ 47 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/fleet.global_checkpoints.json

@@ -0,0 +1,47 @@
+{
+  "fleet.global_checkpoints":{
+    "documentation":{
+      "url": null,
+      "description": "Returns the current global checkpoints for an index. This API is design for internal use by the fleet server project."
+    },
+    "stability":"experimental",
+    "visibility":"public",
+    "headers":{
+      "accept": [ "application/json"],
+      "content_type": ["application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/{index}/_fleet/global_checkpoints",
+          "methods":[
+            "GET"
+          ],
+          "parts":{
+            "index":{
+              "type":"string",
+              "description":"The name of the index."
+            }
+          }
+        }
+      ]
+    },
+    "params":{
+      "wait_for_advance":{
+        "type":"boolean",
+        "description":"Whether to wait for the global checkpoint to advance past the specified current checkpoints",
+        "default":"false"
+      },
+      "checkpoints":{
+        "type":"list",
+        "description":"Comma separated list of checkpoints",
+        "default":""
+      },
+      "timeout":{
+        "type":"time",
+        "description":"Timeout to wait for global checkpoint to advance",
+        "default":"30s"
+      }
+    }
+  }
+}

+ 1 - 0
x-pack/plugin/fleet/build.gradle

@@ -6,6 +6,7 @@
  */
 
 apply plugin: 'elasticsearch.esplugin'
+apply plugin: 'elasticsearch.internal-cluster-test'
 apply plugin: 'elasticsearch.java-rest-test'
 
 esplugin {

+ 8 - 0
x-pack/plugin/fleet/qa/build.gradle

@@ -0,0 +1,8 @@
+import org.elasticsearch.gradle.test.RestIntegTestTask
+
+apply plugin: 'elasticsearch.build'
+tasks.named("test").configure { enabled = false }
+
+dependencies {
+  api project(':test:framework')
+}

+ 23 - 0
x-pack/plugin/fleet/qa/rest/build.gradle

@@ -0,0 +1,23 @@
+import org.elasticsearch.gradle.info.BuildParams
+
+apply plugin: 'elasticsearch.yaml-rest-test'
+
+restResources {
+  restApi {
+    include '_common', 'indices', 'index', 'fleet'
+  }
+}
+
+dependencies {
+  yamlRestTestImplementation(testArtifact(project(xpackModule('core'))))
+}
+
+testClusters.all {
+  testDistribution = 'DEFAULT'
+  setting 'xpack.security.enabled', 'false'
+}
+
+if (BuildParams.inFipsJvm){
+  // Test clusters run with security disabled
+  tasks.named("yamlRestTest").configure{enabled = false }
+}

+ 24 - 0
x-pack/plugin/fleet/qa/rest/src/yamlRestTest/java/org/elasticsearch/xpack/fleet/FleetRestIT.java

@@ -0,0 +1,24 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.fleet;
+
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
+import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
+
+public class FleetRestIT extends ESClientYamlSuiteTestCase {
+
+    public FleetRestIT(final ClientYamlTestCandidate testCandidate) {
+        super(testCandidate);
+    }
+
+    @ParametersFactory
+    public static Iterable<Object[]> parameters() throws Exception {
+        return ESClientYamlSuiteTestCase.createParameters();
+    }
+}

+ 68 - 0
x-pack/plugin/fleet/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/fleet/global_checkpoints.yml

@@ -0,0 +1,68 @@
+setup:
+  - do:
+      indices.create:
+        index: test-2-shards
+        body:
+          settings:
+            number_of_shards: "2"
+
+  - do:
+      indices.create:
+        index: test-1-shard
+        body:
+          settings:
+            number_of_shards: "1"
+
+  - do:
+      index:
+        index:  test-1-shard
+        body:   { }
+
+  - do:
+      index:
+        index: test-1-shard
+        body: { }
+
+  - do:
+      indices.refresh: {}
+
+---
+"Get Global Checkpoints":
+  - do:
+      fleet.global_checkpoints:
+        index: "test-2-shards"
+
+  - match: { global_checkpoints.0: -1 }
+  - match: { global_checkpoints.1: -1 }
+
+---
+"Get Global Checkpoints after advance":
+  - do:
+      fleet.global_checkpoints:
+        index: "test-1-shard"
+
+  - match: { global_checkpoints.0: 1 }
+
+---
+"Advance timeout":
+  - do:
+      fleet.global_checkpoints:
+        index: "test-1-shard"
+        wait_for_advance: true
+        checkpoints: [100]
+        timeout: "50ms"
+
+  - match: { global_checkpoints.0: 1 }
+  - match: { timed_out: true }
+
+---
+"Advance no timeout":
+  - do:
+      fleet.global_checkpoints:
+        index: "test-1-shard"
+        wait_for_advance: true
+        checkpoints: [0]
+        timeout: "50ms"
+
+  - match: { global_checkpoints.0: 1 }
+  - match: { timed_out: false }

+ 245 - 0
x-pack/plugin/fleet/src/internalClusterTest/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsActionIT.java

@@ -0,0 +1,245 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.fleet.action;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.xpack.fleet.Fleet;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThan;
+
+public class GetGlobalCheckpointsActionIT extends ESIntegTestCase {
+
+    public static final TimeValue TEN_SECONDS = TimeValue.timeValueSeconds(10);
+    public static final long[] EMPTY_ARRAY = new long[0];
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Stream.of(Fleet.class).collect(Collectors.toList());
+    }
+
+    public void testGetGlobalCheckpoints() throws Exception {
+        int shards = between(1, 5);
+        String indexName = "test_index";
+        client().admin()
+            .indices()
+            .prepareCreate(indexName)
+            .setSettings(
+                Settings.builder()
+                    // ESIntegTestCase randomizes durability settings. The global checkpoint only advances after a fsync, hence we
+                    // must run with REQUEST durability
+                    .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
+                    .put("index.number_of_shards", shards)
+                    .put("index.number_of_replicas", 1)
+            )
+            .get();
+
+        final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
+            indexName,
+            false,
+            EMPTY_ARRAY,
+            TimeValue.parseTimeValue(randomTimeValue(), "test")
+        );
+        final GetGlobalCheckpointsAction.Response response = client().execute(GetGlobalCheckpointsAction.INSTANCE, request).get();
+        long[] expected = new long[shards];
+        for (int i = 0; i < shards; ++i) {
+            expected[i] = -1;
+        }
+        assertArrayEquals(expected, response.globalCheckpoints());
+
+        final int totalDocuments = shards * 3;
+        for (int i = 0; i < totalDocuments; ++i) {
+            client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("{}", XContentType.JSON).get();
+        }
+
+        final GetGlobalCheckpointsAction.Request request2 = new GetGlobalCheckpointsAction.Request(
+            indexName,
+            false,
+            EMPTY_ARRAY,
+            TimeValue.parseTimeValue(randomTimeValue(), "test")
+        );
+        final GetGlobalCheckpointsAction.Response response2 = client().execute(GetGlobalCheckpointsAction.INSTANCE, request2).get();
+
+        assertEquals(totalDocuments, Arrays.stream(response2.globalCheckpoints()).map(s -> s + 1).sum());
+
+        client().admin().indices().prepareRefresh(indexName).get();
+
+        final IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(indexName).get();
+        long[] fromStats = Arrays.stream(statsResponse.getShards())
+            .filter(i -> i.getShardRouting().primary())
+            .sorted(Comparator.comparingInt(value -> value.getShardRouting().id()))
+            .mapToLong(s -> s.getSeqNoStats().getGlobalCheckpoint())
+            .toArray();
+        assertArrayEquals(fromStats, response2.globalCheckpoints());
+    }
+
+    public void testPollGlobalCheckpointAdvancement() throws Exception {
+        String indexName = "test_index";
+        client().admin()
+            .indices()
+            .prepareCreate(indexName)
+            .setSettings(
+                Settings.builder()
+                    .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
+                    .put("index.number_of_shards", 1)
+                    .put("index.number_of_replicas", 1)
+            )
+            .get();
+
+        final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
+            indexName,
+            false,
+            EMPTY_ARRAY,
+            TEN_SECONDS
+        );
+        final GetGlobalCheckpointsAction.Response response = client().execute(GetGlobalCheckpointsAction.INSTANCE, request).get();
+        assertEquals(-1, response.globalCheckpoints()[0]);
+
+        final int totalDocuments = between(25, 50);
+        new Thread(() -> {
+            for (int i = 0; i < totalDocuments; ++i) {
+                client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("{}", XContentType.JSON).execute();
+            }
+        }).start();
+
+        final GetGlobalCheckpointsAction.Request request2 = new GetGlobalCheckpointsAction.Request(
+            indexName,
+            true,
+            new long[] { totalDocuments - 2 },
+            TimeValue.timeValueSeconds(30)
+        );
+        long start = System.nanoTime();
+        final GetGlobalCheckpointsAction.Response response2 = client().execute(GetGlobalCheckpointsAction.INSTANCE, request2).get();
+        long elapsed = TimeValue.timeValueNanos(System.nanoTime() - start).seconds();
+
+        assertThat(elapsed, lessThan(30L));
+        assertFalse(response.timedOut());
+        assertEquals(totalDocuments - 1, response2.globalCheckpoints()[0]);
+
+    }
+
+    public void testPollGlobalCheckpointAdvancementTimeout() throws Exception {
+        String indexName = "test_index";
+        client().admin()
+            .indices()
+            .prepareCreate(indexName)
+            .setSettings(
+                Settings.builder()
+                    .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
+                    .put("index.number_of_shards", 1)
+                    .put("index.number_of_replicas", 0)
+            )
+            .get();
+
+        final int totalDocuments = 30;
+        for (int i = 0; i < totalDocuments; ++i) {
+            client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("{}", XContentType.JSON).get();
+        }
+
+        final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
+            indexName,
+            true,
+            new long[] { 29 },
+            TimeValue.timeValueMillis(between(0, 100))
+        );
+        long start = System.nanoTime();
+        GetGlobalCheckpointsAction.Response response = client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet();
+        long elapsed = TimeValue.timeValueNanos(System.nanoTime() - start).seconds();
+        assertThat(elapsed, lessThan(30L));
+        assertTrue(response.timedOut());
+        assertEquals(29L, response.globalCheckpoints()[0]);
+    }
+
+    public void testMustProvideCorrectNumberOfShards() throws Exception {
+        String indexName = "test_index";
+        client().admin()
+            .indices()
+            .prepareCreate(indexName)
+            .setSettings(
+                Settings.builder()
+                    .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
+                    .put("index.number_of_shards", 1)
+                    .put("index.number_of_replicas", 0)
+            )
+            .get();
+
+        final long[] incorrectArrayLength = new long[2];
+        final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
+            indexName,
+            true,
+            incorrectArrayLength,
+            TEN_SECONDS
+        );
+        ElasticsearchStatusException exception = expectThrows(
+            ElasticsearchStatusException.class,
+            () -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet()
+        );
+        assertThat(exception.status(), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(
+            exception.getMessage(),
+            equalTo("number of checkpoints must equal number of shards. [shard count: 1, checkpoint count: 2]")
+        );
+    }
+
+    public void testWaitForAdvanceOnlySupportsOneShard() throws Exception {
+        String indexName = "test_index";
+        client().admin()
+            .indices()
+            .prepareCreate(indexName)
+            .setSettings(
+                Settings.builder()
+                    .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
+                    .put("index.number_of_shards", 3)
+                    .put("index.number_of_replicas", 0)
+            )
+            .get();
+
+        final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
+            indexName,
+            true,
+            new long[3],
+            TEN_SECONDS
+        );
+        ElasticsearchStatusException exception = expectThrows(
+            ElasticsearchStatusException.class,
+            () -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet()
+        );
+        assertThat(exception.status(), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(exception.getMessage(), equalTo("wait_for_advance only supports indices with one shard. [shard count: 3]"));
+    }
+
+    public void testIndexDoesNotExist() throws Exception {
+        final GetGlobalCheckpointsAction.Request request = new GetGlobalCheckpointsAction.Request(
+            "non-existent",
+            false,
+            EMPTY_ARRAY,
+            TimeValue.parseTimeValue(randomTimeValue(), "test")
+        );
+        ElasticsearchException exception = expectThrows(
+            IndexNotFoundException.class,
+            () -> client().execute(GetGlobalCheckpointsAction.INSTANCE, request).actionGet()
+        );
+    }
+}

+ 35 - 0
x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java

@@ -8,17 +8,31 @@
 package org.elasticsearch.xpack.fleet;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.settings.SettingsFilter;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.indices.SystemIndexDescriptor;
 import org.elasticsearch.indices.SystemIndexDescriptor.Type;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.SystemIndexPlugin;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestHandler;
 import org.elasticsearch.xpack.core.template.TemplateUtils;
+import org.elasticsearch.xpack.fleet.action.GetGlobalCheckpointsAction;
+import org.elasticsearch.xpack.fleet.action.GetGlobalCheckpointsShardAction;
+import org.elasticsearch.xpack.fleet.rest.RestGetGlobalCheckpointsAction;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.function.Supplier;
 
 import static org.elasticsearch.xpack.core.ClientHelper.FLEET_ORIGIN;
 
@@ -186,4 +200,25 @@ public class Fleet extends Plugin implements SystemIndexPlugin {
     private String loadTemplateSource(String resource) {
         return TemplateUtils.loadTemplate(resource, Version.CURRENT.toString(), MAPPING_VERSION_VARIABLE);
     }
+
+    @Override
+    public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
+        return Arrays.asList(
+            new ActionHandler<>(GetGlobalCheckpointsAction.INSTANCE, GetGlobalCheckpointsAction.TransportAction.class),
+            new ActionHandler<>(GetGlobalCheckpointsShardAction.INSTANCE, GetGlobalCheckpointsShardAction.TransportAction.class)
+        );
+    }
+
+    @Override
+    public List<RestHandler> getRestHandlers(
+        Settings settings,
+        RestController restController,
+        ClusterSettings clusterSettings,
+        IndexScopedSettings indexScopedSettings,
+        SettingsFilter settingsFilter,
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        Supplier<DiscoveryNodes> nodesInCluster
+    ) {
+        return Arrays.asList(new RestGetGlobalCheckpointsAction());
+    }
 }

+ 246 - 0
x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsAction.java

@@ -0,0 +1,246 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.fleet.action;
+
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.common.util.concurrent.CountDown;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.seqno.SequenceNumbers;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class GetGlobalCheckpointsAction extends ActionType<GetGlobalCheckpointsAction.Response> {
+
+    public static final GetGlobalCheckpointsAction INSTANCE = new GetGlobalCheckpointsAction();
+    public static final String NAME = "indices:monitor/fleet/global_checkpoints";
+
+    private GetGlobalCheckpointsAction() {
+        super(NAME, GetGlobalCheckpointsAction.Response::new);
+    }
+
+    public static class Response extends ActionResponse implements ToXContentObject {
+
+        private final boolean timedOut;
+        private final long[] globalCheckpoints;
+
+        public Response(boolean timedOut, long[] globalCheckpoints) {
+            this.timedOut = timedOut;
+            this.globalCheckpoints = globalCheckpoints;
+        }
+
+        public Response(StreamInput in) {
+            throw new AssertionError("GetGlobalCheckpointsAction should not be sent over the wire.");
+        }
+
+        public long[] globalCheckpoints() {
+            return globalCheckpoints;
+        }
+
+        public boolean timedOut() {
+            return timedOut;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            throw new AssertionError("GetGlobalCheckpointsAction should not be sent over the wire.");
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+            builder.startObject();
+            builder.field("timed_out", timedOut);
+            builder.array("global_checkpoints", globalCheckpoints);
+            return builder.endObject();
+        }
+    }
+
+    public static class Request extends ActionRequest implements IndicesRequest {
+
+        private final String index;
+        private final boolean waitForAdvance;
+        private final long[] checkpoints;
+        private final TimeValue timeout;
+
+        public Request(String index, boolean waitForAdvance, long[] checkpoints, TimeValue timeout) {
+            this.index = index;
+            this.waitForAdvance = waitForAdvance;
+            this.checkpoints = checkpoints;
+            this.timeout = timeout;
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            if (Arrays.stream(checkpoints).anyMatch(l -> l < -1)) {
+                ActionRequestValidationException e = new ActionRequestValidationException();
+                e.addValidationError("All checkpoints must be >= -1. Found: " + Arrays.toString(checkpoints));
+                return e;
+            }
+            return null;
+        }
+
+        public TimeValue timeout() {
+            return timeout;
+        }
+
+        public boolean waitForAdvance() {
+            return waitForAdvance;
+        }
+
+        public long[] checkpoints() {
+            return checkpoints;
+        }
+
+        @Override
+        public String[] indices() {
+            return new String[] { index };
+        }
+
+        @Override
+        public IndicesOptions indicesOptions() {
+            return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
+        }
+    }
+
+    public static class TransportAction extends org.elasticsearch.action.support.TransportAction<Request, Response> {
+
+        private final ClusterService clusterService;
+        private final NodeClient client;
+        private final IndexNameExpressionResolver resolver;
+
+        @Inject
+        public TransportAction(
+            final ActionFilters actionFilters,
+            final TransportService transportService,
+            final ClusterService clusterService,
+            final NodeClient client,
+            final IndexNameExpressionResolver resolver
+        ) {
+            super(NAME, actionFilters, transportService.getTaskManager());
+            this.clusterService = clusterService;
+            this.client = client;
+            this.resolver = resolver;
+        }
+
+        @Override
+        protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
+            final ClusterState state = clusterService.state();
+            final Index index = resolver.concreteSingleIndex(state, request);
+            final IndexMetadata indexMetadata = state.getMetadata().index(index);
+
+            if (indexMetadata == null) {
+                // Index not found
+                listener.onFailure(new IndexNotFoundException(request.index));
+                return;
+            }
+
+            final int numberOfShards = indexMetadata.getNumberOfShards();
+
+            if (request.waitForAdvance() && numberOfShards != 1) {
+                listener.onFailure(
+                    new ElasticsearchStatusException(
+                        "wait_for_advance only supports indices with one shard. " + "[shard count: " + numberOfShards + "]",
+                        RestStatus.BAD_REQUEST
+                    )
+                );
+                return;
+            }
+
+            final long[] checkpoints;
+            final int currentCheckpointCount = request.checkpoints().length;
+            if (currentCheckpointCount != 0) {
+                if (currentCheckpointCount != numberOfShards) {
+                    listener.onFailure(
+                        new ElasticsearchStatusException(
+                            "number of checkpoints must equal number of shards. "
+                                + "[shard count: "
+                                + numberOfShards
+                                + ", checkpoint count: "
+                                + currentCheckpointCount
+                                + "]",
+                            RestStatus.BAD_REQUEST
+                        )
+                    );
+                    return;
+                }
+                checkpoints = request.checkpoints();
+            } else {
+                checkpoints = new long[numberOfShards];
+                for (int i = 0; i < numberOfShards; ++i) {
+                    checkpoints[i] = SequenceNumbers.NO_OPS_PERFORMED;
+                }
+            }
+
+            final AtomicArray<GetGlobalCheckpointsShardAction.Response> responses = new AtomicArray<>(numberOfShards);
+            final AtomicBoolean timedOut = new AtomicBoolean(false);
+            final CountDown countDown = new CountDown(numberOfShards);
+            for (int i = 0; i < numberOfShards; ++i) {
+                final int shardIndex = i;
+                GetGlobalCheckpointsShardAction.Request shardChangesRequest = new GetGlobalCheckpointsShardAction.Request(
+                    new ShardId(indexMetadata.getIndex(), shardIndex),
+                    request.waitForAdvance(),
+                    checkpoints[shardIndex],
+                    request.timeout()
+                );
+
+                client.execute(GetGlobalCheckpointsShardAction.INSTANCE, shardChangesRequest, new ActionListener<>() {
+                    @Override
+                    public void onResponse(GetGlobalCheckpointsShardAction.Response response) {
+                        assert responses.get(shardIndex) == null : "Already have a response for shard [" + shardIndex + "]";
+                        if (response.timedOut()) {
+                            timedOut.set(true);
+                        }
+                        responses.set(shardIndex, response);
+                        if (countDown.countDown()) {
+                            long[] globalCheckpoints = new long[responses.length()];
+                            int i = 0;
+                            for (GetGlobalCheckpointsShardAction.Response r : responses.asList()) {
+                                globalCheckpoints[i++] = r.getGlobalCheckpoint();
+                            }
+                            listener.onResponse(new Response(timedOut.get(), globalCheckpoints));
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        if (countDown.fastForward()) {
+                            listener.onFailure(e);
+                        }
+                    }
+                });
+            }
+        }
+    }
+}

+ 246 - 0
x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/action/GetGlobalCheckpointsShardAction.java

@@ -0,0 +1,246 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.fleet.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.single.shard.SingleShardRequest;
+import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.routing.ShardsIterator;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.seqno.SeqNoStats;
+import org.elasticsearch.index.shard.GlobalCheckpointListeners;
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
+
+public class GetGlobalCheckpointsShardAction extends ActionType<GetGlobalCheckpointsShardAction.Response> {
+
+    public static final GetGlobalCheckpointsShardAction INSTANCE = new GetGlobalCheckpointsShardAction();
+    public static final String NAME = "indices:monitor/fleet/global_checkpoints[s]";
+
+    private GetGlobalCheckpointsShardAction() {
+        super(NAME, GetGlobalCheckpointsShardAction.Response::new);
+    }
+
+    public static class Response extends ActionResponse {
+
+        private final long globalCheckpoint;
+        private final boolean timedOut;
+
+        public Response(long globalCheckpoint, boolean timedOut) {
+            this.globalCheckpoint = globalCheckpoint;
+            this.timedOut = timedOut;
+        }
+
+        public Response(StreamInput in) throws IOException {
+            super(in);
+            globalCheckpoint = in.readLong();
+            timedOut = in.readBoolean();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeLong(globalCheckpoint);
+            out.writeBoolean(timedOut);
+        }
+
+        public long getGlobalCheckpoint() {
+            return globalCheckpoint;
+        }
+
+        public boolean timedOut() {
+            return timedOut;
+        }
+    }
+
+    public static class Request extends SingleShardRequest<Request> {
+
+        private final ShardId shardId;
+        private final boolean waitForAdvance;
+        private final long checkpoint;
+        private final TimeValue timeout;
+
+        Request(ShardId shardId, boolean waitForAdvance, long checkpoint, TimeValue timeout) {
+            super(shardId.getIndexName());
+            this.shardId = shardId;
+            this.waitForAdvance = waitForAdvance;
+            this.checkpoint = checkpoint;
+            this.timeout = timeout;
+        }
+
+        Request(StreamInput in) throws IOException {
+            super(in);
+            this.shardId = new ShardId(in);
+            this.waitForAdvance = in.readBoolean();
+            this.checkpoint = in.readLong();
+            this.timeout = in.readTimeValue();
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        public ShardId getShardId() {
+            return shardId;
+        }
+
+        public TimeValue timeout() {
+            return timeout;
+        }
+
+        public boolean waitForAdvance() {
+            return waitForAdvance;
+        }
+
+        public long checkpoint() {
+            return checkpoint;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            shardId.writeTo(out);
+            out.writeBoolean(waitForAdvance);
+            out.writeLong(checkpoint);
+            out.writeTimeValue(timeout);
+        }
+    }
+
+    public static class TransportAction extends TransportSingleShardAction<Request, Response> {
+
+        private final IndicesService indicesService;
+
+        @Inject
+        public TransportAction(
+            ThreadPool threadPool,
+            ClusterService clusterService,
+            TransportService transportService,
+            ActionFilters actionFilters,
+            IndexNameExpressionResolver indexNameExpressionResolver,
+            IndicesService indicesService
+        ) {
+            super(
+                NAME,
+                threadPool,
+                clusterService,
+                transportService,
+                actionFilters,
+                indexNameExpressionResolver,
+                Request::new,
+                ThreadPool.Names.GENERIC
+            );
+            this.indicesService = indicesService;
+        }
+
+        @Override
+        protected Response shardOperation(Request request, ShardId shardId) {
+            final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
+            final IndexShard indexShard = indexService.getShard(shardId.id());
+            final SeqNoStats seqNoStats = indexShard.seqNoStats();
+            return new Response(seqNoStats.getGlobalCheckpoint(), false);
+        }
+
+        @Override
+        protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
+            final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
+            final IndexShard indexShard = indexService.getShard(shardId.id());
+            final SeqNoStats seqNoStats = indexShard.seqNoStats();
+
+            if (request.waitForAdvance() && request.checkpoint() >= seqNoStats.getGlobalCheckpoint()) {
+                indexShard.addGlobalCheckpointListener(request.checkpoint() + 1, new GlobalCheckpointListeners.GlobalCheckpointListener() {
+
+                    @Override
+                    public Executor executor() {
+                        return threadPool.executor(ThreadPool.Names.GENERIC);
+                    }
+
+                    @Override
+                    public void accept(final long g, final Exception e) {
+                        if (g != UNASSIGNED_SEQ_NO) {
+                            assert request.checkpoint() < g
+                                : shardId + " only advanced to [" + g + "] while waiting for [" + request.checkpoint() + "]";
+                            globalCheckpointAdvanced(shardId, request, listener);
+                        } else {
+                            assert e != null;
+                            globalCheckpointAdvancementFailure(indexShard, request, e, listener);
+                        }
+                    }
+
+                }, request.timeout());
+            } else {
+                super.asyncShardOperation(request, shardId, listener);
+            }
+        }
+
+        private void globalCheckpointAdvanced(final ShardId shardId, final Request request, final ActionListener<Response> listener) {
+            try {
+                super.asyncShardOperation(request, shardId, listener);
+            } catch (final IOException caught) {
+                listener.onFailure(caught);
+            }
+        }
+
+        private void globalCheckpointAdvancementFailure(
+            final IndexShard indexShard,
+            final Request request,
+            final Exception e,
+            final ActionListener<Response> listener
+        ) {
+            try {
+                if (e instanceof TimeoutException) {
+                    final long globalCheckpoint = indexShard.seqNoStats().getGlobalCheckpoint();
+                    if (request.checkpoint() >= globalCheckpoint) {
+                        listener.onResponse(new Response(globalCheckpoint, true));
+                    } else {
+                        listener.onResponse(new Response(globalCheckpoint, false));
+                    }
+                } else {
+                    listener.onFailure(e);
+                }
+            } catch (RuntimeException e2) {
+                listener.onFailure(e2);
+            }
+        }
+
+        @Override
+        protected Writeable.Reader<Response> getResponseReader() {
+            return Response::new;
+        }
+
+        @Override
+        protected boolean resolveIndex(Request request) {
+            return true;
+        }
+
+        @Override
+        protected ShardsIterator shards(ClusterState state, InternalRequest request) {
+            return state.routingTable().shardRoutingTable(request.request().getShardId()).primaryShardIt();
+        }
+    }
+}

+ 51 - 0
x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/rest/RestGetGlobalCheckpointsAction.java

@@ -0,0 +1,51 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.fleet.rest;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xpack.fleet.action.GetGlobalCheckpointsAction;
+
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+
+public class RestGetGlobalCheckpointsAction extends BaseRestHandler {
+
+    @Override
+    public String getName() {
+        return "fleet_get_global_checkpoints";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(GET, "/{index}/_fleet/global_checkpoints"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
+        final String index = request.param("index");
+        final boolean waitForAdvance = request.paramAsBoolean("wait_for_advance", false);
+        final String[] stringCheckpoints = request.paramAsStringArray("checkpoints", new String[0]);
+        final long[] checkpoints = new long[stringCheckpoints.length];
+        for (int i = 0; i < stringCheckpoints.length; ++i) {
+            checkpoints[i] = Long.parseLong(stringCheckpoints[i]);
+        }
+        final TimeValue pollTimeout = request.paramAsTime("timeout", TimeValue.timeValueSeconds(30));
+        GetGlobalCheckpointsAction.Request getCheckpointsRequest = new GetGlobalCheckpointsAction.Request(
+            index,
+            waitForAdvance,
+            checkpoints,
+            pollTimeout
+        );
+        return channel -> client.execute(GetGlobalCheckpointsAction.INSTANCE, getCheckpointsRequest, new RestToXContentListener<>(channel));
+    }
+}

+ 2 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -423,6 +423,8 @@ public class Constants {
         "indices:data/write/update",
         "indices:data/write/update/byquery",
         "indices:monitor/data_stream/stats",
+        "indices:monitor/fleet/global_checkpoints[s]",
+        "indices:monitor/fleet/global_checkpoints",
         "indices:monitor/recovery",
         "indices:monitor/segments",
         "indices:monitor/settings/get",