Explorar el Código

Expose CCR stats to monitoring (#33617)

This commit exposes the CCR stats endpoint to monitoring collection.

Co-authored-by: Martijn van Groningen <martijn.v.groningen@gmail.com>
Jason Tedor hace 7 años
padre
commit
23f12e42c1
Se han modificado 19 ficheros con 728 adiciones y 104 borrados
  1. 1 1
      x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle
  2. 44 0
      x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java
  3. 1 0
      x-pack/plugin/ccr/qa/multi-cluster/build.gradle
  4. 36 0
      x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java
  5. 8 8
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java
  6. 2 6
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java
  7. 1 0
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java
  8. 16 16
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java
  9. 1 1
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java
  10. 2 2
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java
  11. 6 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java
  12. 50 45
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java
  13. 25 21
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java
  14. 4 4
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java
  15. 3 0
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java
  16. 89 0
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java
  17. 47 0
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDoc.java
  18. 217 0
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java
  19. 175 0
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java

+ 1 - 1
x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle

@@ -47,7 +47,7 @@ followClusterTestCluster {
     setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
     setting 'xpack.license.self_generated.type', 'trial'
     setting 'xpack.security.enabled', 'true'
-    setting 'xpack.monitoring.enabled', 'false'
+    setting 'xpack.monitoring.collection.enabled', 'true'
     extraConfigFile 'roles.yml', 'roles.yml'
     setupCommand 'setupTestAdmin',
             'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser"

+ 44 - 0
x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java

@@ -30,6 +30,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 
 public class FollowIndexSecurityIT extends ESRestTestCase {
@@ -80,6 +81,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
             createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex);
             assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
             assertThat(countCcrNodeTasks(), equalTo(1));
+            assertBusy(() -> verifyCcrMonitoring(allowedIndex));
             assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
             // Make sure that there are no other ccr relates operations running:
             assertBusy(() -> {
@@ -203,4 +205,46 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
         return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
     }
 
+    private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException {
+        ensureYellow(".monitoring-*");
+
+        Request request = new Request("GET", "/.monitoring-*/_search");
+        request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_stats\"}}}");
+        Map<String, ?> response = toMap(adminClient().performRequest(request));
+
+        int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
+        assertThat(numDocs, greaterThanOrEqualTo(1));
+
+        int numberOfOperationsReceived = 0;
+        int numberOfOperationsIndexed = 0;
+
+        List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
+        for (int i = 0; i < numDocs; i++) {
+            Map<?, ?> hit = (Map<?, ?>) hits.get(i);
+            String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
+            if (leaderIndex.endsWith(expectedLeaderIndex) == false) {
+                continue;
+            }
+
+            int foundNumberOfOperationsReceived =
+                (int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);
+            numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived);
+            int foundNumberOfOperationsIndexed =
+                (int) XContentMapValues.extractValue("_source.ccr_stats.number_of_operations_indexed", hit);
+            numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed);
+        }
+
+        assertThat(numberOfOperationsReceived, greaterThanOrEqualTo(1));
+        assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
+    }
+
+    private static void ensureYellow(String index) throws IOException {
+        Request request = new Request("GET", "/_cluster/health/" + index);
+        request.addParameter("wait_for_status", "yellow");
+        request.addParameter("wait_for_no_relocating_shards", "true");
+        request.addParameter("timeout", "70s");
+        request.addParameter("level", "shards");
+        adminClient().performRequest(request);
+    }
+
 }

+ 1 - 0
x-pack/plugin/ccr/qa/multi-cluster/build.gradle

@@ -27,6 +27,7 @@ followClusterTestCluster {
     dependsOn leaderClusterTestRunner
     numNodes = 1
     clusterName = 'follow-cluster'
+    setting 'xpack.monitoring.collection.enabled', 'true'
     setting 'xpack.license.self_generated.type', 'trial'
     setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
 }

+ 36 - 0
x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java

@@ -25,6 +25,7 @@ import java.util.Map;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 
 public class FollowIndexIT extends ESRestTestCase {
 
@@ -75,6 +76,7 @@ public class FollowIndexIT extends ESRestTestCase {
                 index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true");
             }
             assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3));
+            assertBusy(() -> verifyCcrMonitoring(leaderIndexName));
         }
     }
 
@@ -104,6 +106,7 @@ public class FollowIndexIT extends ESRestTestCase {
             ensureYellow("logs-20190101");
             verifyDocuments("logs-20190101", 5);
         });
+        assertBusy(() -> verifyCcrMonitoring("logs-20190101"));
     }
 
     private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
@@ -155,6 +158,39 @@ public class FollowIndexIT extends ESRestTestCase {
         }
     }
 
+    private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException {
+        ensureYellow(".monitoring-*");
+
+        Request request = new Request("GET", "/.monitoring-*/_search");
+        request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_stats\"}}}");
+        Map<String, ?> response = toMap(client().performRequest(request));
+
+        int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
+        assertThat(numDocs, greaterThanOrEqualTo(1));
+
+        int numberOfOperationsReceived = 0;
+        int numberOfOperationsIndexed = 0;
+
+        List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
+        for (int i = 0; i < numDocs; i++) {
+            Map<?, ?> hit = (Map<?, ?>) hits.get(i);
+            String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
+            if (leaderIndex.endsWith(expectedLeaderIndex) == false) {
+                continue;
+            }
+
+            int foundNumberOfOperationsReceived =
+                (int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);
+            numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived);
+            int foundNumberOfOperationsIndexed =
+                (int) XContentMapValues.extractValue("_source.ccr_stats.number_of_operations_indexed", hit);
+            numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed);
+        }
+
+        assertThat(numberOfOperationsReceived, greaterThanOrEqualTo(1));
+        assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
+    }
+
     private static Map<String, Object> toMap(Response response) throws IOException {
         return toMap(EntityUtils.toString(response.getEntity()));
     }

+ 8 - 8
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

@@ -40,21 +40,17 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.watcher.ResourceWatcherService;
 import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
-import org.elasticsearch.xpack.ccr.action.TransportUnfollowIndexAction;
-import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
-import org.elasticsearch.xpack.ccr.action.TransportCreateAndFollowIndexAction;
-import org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction;
-import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
 import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction;
-import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
 import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction;
 import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
 import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
 import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
 import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
+import org.elasticsearch.xpack.ccr.action.TransportCreateAndFollowIndexAction;
 import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
+import org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction;
 import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
-import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
+import org.elasticsearch.xpack.ccr.action.TransportUnfollowIndexAction;
 import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
 import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
 import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
@@ -66,6 +62,10 @@ import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
 import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
 import org.elasticsearch.xpack.core.XPackPlugin;
 import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
+import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
+import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
+import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
+import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -76,8 +76,8 @@ import java.util.Optional;
 import java.util.function.Supplier;
 
 import static java.util.Collections.emptyList;
-import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_ENABLED_SETTING;
 import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTING;
+import static org.elasticsearch.xpack.core.XPackSettings.CCR_ENABLED_SETTING;
 
 /**
  * Container class for CCR functionality.

+ 2 - 6
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ccr;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.xpack.core.XPackSettings;
 
 import java.util.Arrays;
 import java.util.List;
@@ -22,11 +23,6 @@ public final class CcrSettings {
 
     }
 
-    /**
-     * Setting for controlling whether or not CCR is enabled.
-     */
-    static final Setting<Boolean> CCR_ENABLED_SETTING = Setting.boolSetting("xpack.ccr.enabled", true, Property.NodeScope);
-
     /**
      * Index setting for a following index.
      */
@@ -46,7 +42,7 @@ public final class CcrSettings {
      */
     static List<Setting<?>> getSettings() {
         return Arrays.asList(
-                CCR_ENABLED_SETTING,
+                XPackSettings.CCR_ENABLED_SETTING,
                 CCR_FOLLOWING_INDEX_SETTING,
                 CCR_AUTO_FOLLOW_POLL_INTERVAL);
     }

+ 1 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

@@ -20,6 +20,7 @@ import org.elasticsearch.persistent.AllocatedPersistentTask;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
 import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
+import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
 
 import java.util.ArrayList;
 import java.util.Arrays;

+ 16 - 16
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java

@@ -34,8 +34,8 @@ import java.util.function.Consumer;
 
 public class TransportCcrStatsAction extends TransportTasksAction<
         ShardFollowNodeTask,
-        CcrStatsAction.TasksRequest,
-        CcrStatsAction.TasksResponse, CcrStatsAction.TaskResponse> {
+        CcrStatsAction.StatsRequest,
+        CcrStatsAction.StatsResponses, CcrStatsAction.StatsResponse> {
 
     private final IndexNameExpressionResolver resolver;
     private final CcrLicenseChecker ccrLicenseChecker;
@@ -54,8 +54,8 @@ public class TransportCcrStatsAction extends TransportTasksAction<
                 clusterService,
                 transportService,
                 actionFilters,
-                CcrStatsAction.TasksRequest::new,
-                CcrStatsAction.TasksResponse::new,
+                CcrStatsAction.StatsRequest::new,
+                CcrStatsAction.StatsResponses::new,
                 Ccr.CCR_THREAD_POOL_NAME);
         this.resolver = Objects.requireNonNull(resolver);
         this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
@@ -64,8 +64,8 @@ public class TransportCcrStatsAction extends TransportTasksAction<
     @Override
     protected void doExecute(
             final Task task,
-            final CcrStatsAction.TasksRequest request,
-            final ActionListener<CcrStatsAction.TasksResponse> listener) {
+            final CcrStatsAction.StatsRequest request,
+            final ActionListener<CcrStatsAction.StatsResponses> listener) {
         if (ccrLicenseChecker.isCcrAllowed() == false) {
             listener.onFailure(LicenseUtils.newComplianceException("ccr"));
             return;
@@ -74,21 +74,21 @@ public class TransportCcrStatsAction extends TransportTasksAction<
     }
 
     @Override
-    protected CcrStatsAction.TasksResponse newResponse(
-            final CcrStatsAction.TasksRequest request,
-            final List<CcrStatsAction.TaskResponse> taskResponses,
+    protected CcrStatsAction.StatsResponses newResponse(
+            final CcrStatsAction.StatsRequest request,
+            final List<CcrStatsAction.StatsResponse> statsRespons,
             final List<TaskOperationFailure> taskOperationFailures,
             final List<FailedNodeException> failedNodeExceptions) {
-        return new CcrStatsAction.TasksResponse(taskOperationFailures, failedNodeExceptions, taskResponses);
+        return new CcrStatsAction.StatsResponses(taskOperationFailures, failedNodeExceptions, statsRespons);
     }
 
     @Override
-    protected CcrStatsAction.TaskResponse readTaskResponse(final StreamInput in) throws IOException {
-        return new CcrStatsAction.TaskResponse(in);
+    protected CcrStatsAction.StatsResponse readTaskResponse(final StreamInput in) throws IOException {
+        return new CcrStatsAction.StatsResponse(in);
     }
 
     @Override
-    protected void processTasks(final CcrStatsAction.TasksRequest request, final Consumer<ShardFollowNodeTask> operation) {
+    protected void processTasks(final CcrStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
         final ClusterState state = clusterService.state();
         final Set<String> concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request)));
         for (final Task task : taskManager.getTasks().values()) {
@@ -103,10 +103,10 @@ public class TransportCcrStatsAction extends TransportTasksAction<
 
     @Override
     protected void taskOperation(
-            final CcrStatsAction.TasksRequest request,
+            final CcrStatsAction.StatsRequest request,
             final ShardFollowNodeTask task,
-            final ActionListener<CcrStatsAction.TaskResponse> listener) {
-        listener.onResponse(new CcrStatsAction.TaskResponse(task.getFollowShardId(), task.getStatus()));
+            final ActionListener<CcrStatsAction.StatsResponse> listener) {
+        listener.onResponse(new CcrStatsAction.StatsResponse(task.getFollowShardId(), task.getStatus()));
     }
 
 }

+ 1 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java

@@ -33,7 +33,7 @@ public class RestCcrStatsAction extends BaseRestHandler {
 
     @Override
     protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
-        final CcrStatsAction.TasksRequest request = new CcrStatsAction.TasksRequest();
+        final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest();
         request.setIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
         request.setIndicesOptions(IndicesOptions.fromRequest(restRequest, request.indicesOptions()));
         return channel -> client.execute(CcrStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));

+ 2 - 2
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java

@@ -90,9 +90,9 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
 
     public void testThatCcrStatsAreUnavailableWithNonCompliantLicense() throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(1);
-        client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.TasksRequest(), new ActionListener<CcrStatsAction.TasksResponse>() {
+        client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.StatsRequest(), new ActionListener<CcrStatsAction.StatsResponses>() {
             @Override
-            public void onResponse(final CcrStatsAction.TasksResponse tasksResponse) {
+            public void onResponse(final CcrStatsAction.StatsResponses statsResponses) {
                 latch.countDown();
                 fail();
             }

+ 6 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java

@@ -35,6 +35,12 @@ public class XPackSettings {
         throw new IllegalStateException("Utility class should not be instantiated");
     }
 
+
+    /**
+     * Setting for controlling whether or not CCR is enabled.
+     */
+    public static final Setting<Boolean> CCR_ENABLED_SETTING = Setting.boolSetting("xpack.ccr.enabled", true, Property.NodeScope);
+
     /** Setting for enabling or disabling security. Defaults to true. */
     public static final Setting<Boolean> SECURITY_ENABLED = Setting.boolSetting("xpack.security.enabled", true, Setting.Property.NodeScope);
 

+ 50 - 45
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java

@@ -369,58 +369,63 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
     public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
         builder.startObject();
         {
-            builder.field(LEADER_INDEX.getPreferredName(), leaderIndex);
-            builder.field(SHARD_ID.getPreferredName(), shardId);
-            builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint);
-            builder.field(LEADER_MAX_SEQ_NO_FIELD.getPreferredName(), leaderMaxSeqNo);
-            builder.field(FOLLOWER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), followerGlobalCheckpoint);
-            builder.field(FOLLOWER_MAX_SEQ_NO_FIELD.getPreferredName(), followerMaxSeqNo);
-            builder.field(LAST_REQUESTED_SEQ_NO_FIELD.getPreferredName(), lastRequestedSeqNo);
-            builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads);
-            builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites);
-            builder.field(NUMBER_OF_QUEUED_WRITES_FIELD.getPreferredName(), numberOfQueuedWrites);
-            builder.field(MAPPING_VERSION_FIELD.getPreferredName(), mappingVersion);
-            builder.humanReadableField(
-                    TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(),
-                    "total_fetch_time",
-                    new TimeValue(totalFetchTimeMillis, TimeUnit.MILLISECONDS));
-            builder.field(NUMBER_OF_SUCCESSFUL_FETCHES_FIELD.getPreferredName(), numberOfSuccessfulFetches);
-            builder.field(NUMBER_OF_FAILED_FETCHES_FIELD.getPreferredName(), numberOfFailedFetches);
-            builder.field(OPERATIONS_RECEIVED_FIELD.getPreferredName(), operationsReceived);
-            builder.humanReadableField(
-                    TOTAL_TRANSFERRED_BYTES.getPreferredName(),
-                    "total_transferred",
-                    new ByteSizeValue(totalTransferredBytes, ByteSizeUnit.BYTES));
-            builder.humanReadableField(
-                    TOTAL_INDEX_TIME_MILLIS_FIELD.getPreferredName(),
-                    "total_index_time",
-                    new TimeValue(totalIndexTimeMillis, TimeUnit.MILLISECONDS));
-            builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations);
-            builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations);
-            builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed);
-            builder.startArray(FETCH_EXCEPTIONS.getPreferredName());
-            {
-                for (final Map.Entry<Long, ElasticsearchException> entry : fetchExceptions.entrySet()) {
+            toXContentFragment(builder, params);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    public XContentBuilder toXContentFragment(final XContentBuilder builder, final Params params) throws IOException {
+        builder.field(LEADER_INDEX.getPreferredName(), leaderIndex);
+        builder.field(SHARD_ID.getPreferredName(), shardId);
+        builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint);
+        builder.field(LEADER_MAX_SEQ_NO_FIELD.getPreferredName(), leaderMaxSeqNo);
+        builder.field(FOLLOWER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), followerGlobalCheckpoint);
+        builder.field(FOLLOWER_MAX_SEQ_NO_FIELD.getPreferredName(), followerMaxSeqNo);
+        builder.field(LAST_REQUESTED_SEQ_NO_FIELD.getPreferredName(), lastRequestedSeqNo);
+        builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads);
+        builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites);
+        builder.field(NUMBER_OF_QUEUED_WRITES_FIELD.getPreferredName(), numberOfQueuedWrites);
+        builder.field(MAPPING_VERSION_FIELD.getPreferredName(), mappingVersion);
+        builder.humanReadableField(
+                TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(),
+                "total_fetch_time",
+                new TimeValue(totalFetchTimeMillis, TimeUnit.MILLISECONDS));
+        builder.field(NUMBER_OF_SUCCESSFUL_FETCHES_FIELD.getPreferredName(), numberOfSuccessfulFetches);
+        builder.field(NUMBER_OF_FAILED_FETCHES_FIELD.getPreferredName(), numberOfFailedFetches);
+        builder.field(OPERATIONS_RECEIVED_FIELD.getPreferredName(), operationsReceived);
+        builder.humanReadableField(
+                TOTAL_TRANSFERRED_BYTES.getPreferredName(),
+                "total_transferred",
+                new ByteSizeValue(totalTransferredBytes, ByteSizeUnit.BYTES));
+        builder.humanReadableField(
+                TOTAL_INDEX_TIME_MILLIS_FIELD.getPreferredName(),
+                "total_index_time",
+                new TimeValue(totalIndexTimeMillis, TimeUnit.MILLISECONDS));
+        builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations);
+        builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations);
+        builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed);
+        builder.startArray(FETCH_EXCEPTIONS.getPreferredName());
+        {
+            for (final Map.Entry<Long, ElasticsearchException> entry : fetchExceptions.entrySet()) {
+                builder.startObject();
+                {
+                    builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey());
+                    builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName());
                     builder.startObject();
                     {
-                        builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey());
-                        builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName());
-                        builder.startObject();
-                        {
-                            ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue());
-                        }
-                        builder.endObject();
+                        ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue());
                     }
                     builder.endObject();
                 }
+                builder.endObject();
             }
-            builder.endArray();
-            builder.humanReadableField(
-                    TIME_SINCE_LAST_FETCH_MILLIS_FIELD.getPreferredName(),
-                    "time_since_last_fetch",
-                    new TimeValue(timeSinceLastFetchMillis, TimeUnit.MILLISECONDS));
         }
-        builder.endObject();
+        builder.endArray();
+        builder.humanReadableField(
+                TIME_SINCE_LAST_FETCH_MILLIS_FIELD.getPreferredName(),
+                "time_since_last_fetch",
+                new TimeValue(timeSinceLastFetchMillis, TimeUnit.MILLISECONDS));
         return builder;
     }
 

+ 25 - 21
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java

@@ -29,7 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-public class CcrStatsAction extends Action<CcrStatsAction.TasksResponse> {
+public class CcrStatsAction extends Action<CcrStatsAction.StatsResponses> {
 
     public static final String NAME = "cluster:monitor/ccr/stats";
 
@@ -40,41 +40,45 @@ public class CcrStatsAction extends Action<CcrStatsAction.TasksResponse> {
     }
 
     @Override
-    public TasksResponse newResponse() {
-        return new TasksResponse();
+    public StatsResponses newResponse() {
+        return new StatsResponses();
     }
 
-    public static class TasksResponse extends BaseTasksResponse implements ToXContentObject {
+    public static class StatsResponses extends BaseTasksResponse implements ToXContentObject {
 
-        private final List<TaskResponse> taskResponses;
+        private final List<StatsResponse> statsResponse;
 
-        public TasksResponse() {
+        public List<StatsResponse> getStatsResponses() {
+            return statsResponse;
+        }
+
+        public StatsResponses() {
             this(Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
         }
 
-        public TasksResponse(
+        public StatsResponses(
                 final List<TaskOperationFailure> taskFailures,
                 final List<? extends FailedNodeException> nodeFailures,
-                final List<TaskResponse> taskResponses) {
+                final List<StatsResponse> statsResponse) {
             super(taskFailures, nodeFailures);
-            this.taskResponses = taskResponses;
+            this.statsResponse = statsResponse;
         }
 
         @Override
         public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
             // sort by index name, then shard ID
-            final Map<String, Map<Integer, TaskResponse>> taskResponsesByIndex = new TreeMap<>();
-            for (final TaskResponse taskResponse : taskResponses) {
+            final Map<String, Map<Integer, StatsResponse>> taskResponsesByIndex = new TreeMap<>();
+            for (final StatsResponse statsResponse : statsResponse) {
                 taskResponsesByIndex.computeIfAbsent(
-                        taskResponse.followerShardId().getIndexName(),
-                        k -> new TreeMap<>()).put(taskResponse.followerShardId().getId(), taskResponse);
+                        statsResponse.followerShardId().getIndexName(),
+                        k -> new TreeMap<>()).put(statsResponse.followerShardId().getId(), statsResponse);
             }
             builder.startObject();
             {
-                for (final Map.Entry<String, Map<Integer, TaskResponse>> index : taskResponsesByIndex.entrySet()) {
+                for (final Map.Entry<String, Map<Integer, StatsResponse>> index : taskResponsesByIndex.entrySet()) {
                     builder.startArray(index.getKey());
                     {
-                        for (final Map.Entry<Integer, TaskResponse> shard : index.getValue().entrySet()) {
+                        for (final Map.Entry<Integer, StatsResponse> shard : index.getValue().entrySet()) {
                             shard.getValue().status().toXContent(builder, params);
                         }
                     }
@@ -86,7 +90,7 @@ public class CcrStatsAction extends Action<CcrStatsAction.TasksResponse> {
         }
     }
 
-    public static class TasksRequest extends BaseTasksRequest<TasksRequest> implements IndicesRequest {
+    public static class StatsRequest extends BaseTasksRequest<StatsRequest> implements IndicesRequest {
 
         private String[] indices;
 
@@ -144,26 +148,26 @@ public class CcrStatsAction extends Action<CcrStatsAction.TasksResponse> {
 
     }
 
-    public static class TaskResponse implements Writeable {
+    public static class StatsResponse implements Writeable {
 
         private final ShardId followerShardId;
 
-        ShardId followerShardId() {
+        public ShardId followerShardId() {
             return followerShardId;
         }
 
         private final ShardFollowNodeTaskStatus status;
 
-        ShardFollowNodeTaskStatus status() {
+        public ShardFollowNodeTaskStatus status() {
             return status;
         }
 
-        public TaskResponse(final ShardId followerShardId, final ShardFollowNodeTaskStatus status) {
+        public StatsResponse(final ShardId followerShardId, final ShardFollowNodeTaskStatus status) {
             this.followerShardId = followerShardId;
             this.status = status;
         }
 
-        public TaskResponse(final StreamInput in) throws IOException {
+        public StatsResponse(final StreamInput in) throws IOException {
             this.followerShardId = ShardId.readShardId(in);
             this.status = new ShardFollowNodeTaskStatus(in);
         }

+ 4 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java

@@ -49,13 +49,13 @@ public class CcrClient {
     }
 
     public void stats(
-            final CcrStatsAction.TasksRequest request,
-            final ActionListener<CcrStatsAction.TasksResponse> listener) {
+            final CcrStatsAction.StatsRequest request,
+            final ActionListener<CcrStatsAction.StatsResponses> listener) {
         client.execute(CcrStatsAction.INSTANCE, request, listener);
     }
 
-    public ActionFuture<CcrStatsAction.TasksResponse> stats(final CcrStatsAction.TasksRequest request) {
-        final PlainActionFuture<CcrStatsAction.TasksResponse> listener = PlainActionFuture.newFuture();
+    public ActionFuture<CcrStatsAction.StatsResponses> stats(final CcrStatsAction.StatsRequest request) {
+        final PlainActionFuture<CcrStatsAction.StatsResponses> listener = PlainActionFuture.newFuture();
         client.execute(CcrStatsAction.INSTANCE, request, listener);
         return listener;
     }

+ 3 - 0
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java

@@ -39,6 +39,7 @@ import org.elasticsearch.xpack.core.ssl.SSLService;
 import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction;
 import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
 import org.elasticsearch.xpack.monitoring.collector.Collector;
+import org.elasticsearch.xpack.monitoring.collector.ccr.CcrStatsCollector;
 import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector;
 import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryCollector;
 import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsCollector;
@@ -142,6 +143,7 @@ public class Monitoring extends Plugin implements ActionPlugin {
         collectors.add(new NodeStatsCollector(settings, clusterService, getLicenseState(), client));
         collectors.add(new IndexRecoveryCollector(settings, clusterService, getLicenseState(), client));
         collectors.add(new JobStatsCollector(settings, clusterService, getLicenseState(), client));
+        collectors.add(new CcrStatsCollector(settings, clusterService, getLicenseState(), client));
 
         final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, collectors, exporters);
 
@@ -179,6 +181,7 @@ public class Monitoring extends Plugin implements ActionPlugin {
         settings.add(IndexRecoveryCollector.INDEX_RECOVERY_ACTIVE_ONLY);
         settings.add(IndexStatsCollector.INDEX_STATS_TIMEOUT);
         settings.add(JobStatsCollector.JOB_STATS_TIMEOUT);
+        settings.add(CcrStatsCollector.CCR_STATS_TIMEOUT);
         settings.add(NodeStatsCollector.NODE_STATS_TIMEOUT);
         settings.addAll(Exporters.getSettings());
         return Collections.unmodifiableList(settings);

+ 89 - 0
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java

@@ -0,0 +1,89 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.monitoring.collector.ccr;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.xpack.core.XPackClient;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
+import org.elasticsearch.xpack.core.ccr.client.CcrClient;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+import org.elasticsearch.xpack.monitoring.collector.Collector;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
+import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
+import static org.elasticsearch.xpack.monitoring.collector.ccr.CcrStatsMonitoringDoc.TYPE;
+
+public class CcrStatsCollector extends Collector {
+
+    public static final Setting<TimeValue> CCR_STATS_TIMEOUT = collectionTimeoutSetting("ccr.stats.timeout");
+
+    private final ThreadContext threadContext;
+    private final CcrClient ccrClient;
+
+    public CcrStatsCollector(
+            final Settings settings,
+            final ClusterService clusterService,
+            final XPackLicenseState licenseState,
+            final Client client) {
+        this(settings, clusterService, licenseState, new XPackClient(client).ccr(), client.threadPool().getThreadContext());
+    }
+
+    CcrStatsCollector(
+            final Settings settings,
+            final ClusterService clusterService,
+            final XPackLicenseState licenseState,
+            final CcrClient ccrClient,
+            final ThreadContext threadContext) {
+        super(settings, TYPE, clusterService, CCR_STATS_TIMEOUT, licenseState);
+        this.ccrClient = ccrClient;
+        this.threadContext = threadContext;
+    }
+
+    @Override
+    protected boolean shouldCollect(final boolean isElectedMaster) {
+        // this can only run when monitoring is allowed and CCR is enabled and allowed, but also only on the elected master node
+        return isElectedMaster
+                && super.shouldCollect(isElectedMaster)
+                && XPackSettings.CCR_ENABLED_SETTING.get(settings)
+                && licenseState.isCcrAllowed();
+    }
+
+
+    @Override
+    protected Collection<MonitoringDoc> doCollect(
+            final MonitoringDoc.Node node,
+            final long interval,
+            final ClusterState clusterState) throws Exception {
+        try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, MONITORING_ORIGIN)) {
+            final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest();
+            request.setIndices(Strings.EMPTY_ARRAY);
+            final CcrStatsAction.StatsResponses responses = ccrClient.stats(request).actionGet(getCollectionTimeout());
+
+            final long timestamp = timestamp();
+            final String clusterUuid = clusterUuid(clusterState);
+
+            return responses
+                    .getStatsResponses()
+                    .stream()
+                    .map(stats -> new CcrStatsMonitoringDoc(clusterUuid, timestamp, interval, node, stats.status()))
+                    .collect(Collectors.toList());
+        }
+    }
+
+}

+ 47 - 0
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDoc.java

@@ -0,0 +1,47 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.monitoring.collector.ccr;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
+import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class CcrStatsMonitoringDoc extends MonitoringDoc {
+
+    public static final String TYPE = "ccr_stats";
+
+    private final ShardFollowNodeTaskStatus status;
+
+    public ShardFollowNodeTaskStatus status() {
+        return status;
+    }
+
+    public CcrStatsMonitoringDoc(
+            final String cluster,
+            final long timestamp,
+            final long intervalMillis,
+            final MonitoringDoc.Node node,
+            final ShardFollowNodeTaskStatus status) {
+        super(cluster, timestamp, intervalMillis, node, MonitoredSystem.ES, TYPE, null);
+        this.status = Objects.requireNonNull(status, "status");
+    }
+
+
+    @Override
+    protected void innerToXContent(final XContentBuilder builder, final Params params) throws IOException {
+        builder.startObject(TYPE);
+        {
+            status.toXContentFragment(builder, params);
+        }
+        builder.endObject();
+    }
+
+}

+ 217 - 0
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java

@@ -0,0 +1,217 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.monitoring.collector.ccr;
+
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
+import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
+import org.elasticsearch.xpack.core.ccr.client.CcrClient;
+import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase;
+import org.mockito.ArgumentMatcher;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static java.util.Collections.emptyList;
+import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CcrStatsCollectorTests extends BaseCollectorTestCase {
+
+    public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
+        final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
+        final boolean ccrAllowed = randomBoolean();
+        final boolean isElectedMaster = randomBoolean();
+        whenLocalNodeElectedMaster(isElectedMaster);
+
+        // this controls the blockage
+        when(licenseState.isMonitoringAllowed()).thenReturn(false);
+        when(licenseState.isCcrAllowed()).thenReturn(ccrAllowed);
+
+        final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client);
+
+        assertThat(collector.shouldCollect(isElectedMaster), is(false));
+        if (isElectedMaster) {
+            verify(licenseState).isMonitoringAllowed();
+        }
+    }
+
+    public void testShouldCollectReturnsFalseIfNotMaster() {
+        // regardless of CCR being enabled
+        final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
+
+        when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
+        when(licenseState.isCcrAllowed()).thenReturn(randomBoolean());
+        // this controls the blockage
+        final boolean isElectedMaster = false;
+
+        final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client);
+
+        assertThat(collector.shouldCollect(isElectedMaster), is(false));
+    }
+
+    public void testShouldCollectReturnsFalseIfCCRIsDisabled() {
+        // this is controls the blockage
+        final Settings settings = ccrDisabledSettings();
+
+        when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
+        when(licenseState.isCcrAllowed()).thenReturn(randomBoolean());
+
+        final boolean isElectedMaster = randomBoolean();
+        whenLocalNodeElectedMaster(isElectedMaster);
+
+        final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client);
+
+        assertThat(collector.shouldCollect(isElectedMaster), is(false));
+
+        if (isElectedMaster) {
+            verify(licenseState).isMonitoringAllowed();
+        }
+    }
+
+    public void testShouldCollectReturnsFalseIfCCRIsNotAllowed() {
+        final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
+
+        when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
+        // this is controls the blockage
+        when(licenseState.isCcrAllowed()).thenReturn(false);
+        final boolean isElectedMaster = randomBoolean();
+        whenLocalNodeElectedMaster(isElectedMaster);
+
+        final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client);
+
+        assertThat(collector.shouldCollect(isElectedMaster), is(false));
+
+        if (isElectedMaster) {
+            verify(licenseState).isMonitoringAllowed();
+        }
+    }
+
+    public void testShouldCollectReturnsTrue() {
+        final Settings settings = ccrEnabledSettings();
+
+        when(licenseState.isMonitoringAllowed()).thenReturn(true);
+        when(licenseState.isCcrAllowed()).thenReturn(true);
+        final boolean isElectedMaster = true;
+
+        final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client);
+
+        assertThat(collector.shouldCollect(isElectedMaster), is(true));
+
+        verify(licenseState).isMonitoringAllowed();
+    }
+
+    public void testDoCollect() throws Exception {
+        final String clusterUuid = randomAlphaOfLength(5);
+        whenClusterStateWithUUID(clusterUuid);
+
+        final MonitoringDoc.Node node = randomMonitoringNode(random());
+        final CcrClient client = mock(CcrClient.class);
+        final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+
+        final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
+        withCollectionTimeout(CcrStatsCollector.CCR_STATS_TIMEOUT, timeout);
+
+        final CcrStatsCollector collector = new CcrStatsCollector(Settings.EMPTY, clusterService, licenseState, client, threadContext);
+        assertEquals(timeout, collector.getCollectionTimeout());
+
+        final List<CcrStatsAction.StatsResponse> statuses = mockStatuses();
+
+        @SuppressWarnings("unchecked")
+        final ActionFuture<CcrStatsAction.StatsResponses> future = (ActionFuture<CcrStatsAction.StatsResponses>)mock(ActionFuture.class);
+        final CcrStatsAction.StatsResponses responses = new CcrStatsAction.StatsResponses(emptyList(), emptyList(), statuses);
+
+        final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest();
+        request.setIndices(Strings.EMPTY_ARRAY);
+        when(client.stats(statsRequestEq(request))).thenReturn(future);
+        when(future.actionGet(timeout)).thenReturn(responses);
+
+        final long interval = randomNonNegativeLong();
+
+        final Collection<MonitoringDoc> documents = collector.doCollect(node, interval, clusterState);
+        verify(clusterState).metaData();
+        verify(metaData).clusterUUID();
+
+        assertThat(documents, hasSize(statuses.size()));
+
+        int index = 0;
+        for (final Iterator<MonitoringDoc> it = documents.iterator(); it.hasNext(); index++) {
+            final CcrStatsMonitoringDoc document = (CcrStatsMonitoringDoc)it.next();
+            final CcrStatsAction.StatsResponse status = statuses.get(index);
+
+            assertThat(document.getCluster(), is(clusterUuid));
+            assertThat(document.getTimestamp(), greaterThan(0L));
+            assertThat(document.getIntervalMillis(), equalTo(interval));
+            assertThat(document.getNode(), equalTo(node));
+            assertThat(document.getSystem(), is(MonitoredSystem.ES));
+            assertThat(document.getType(), is(CcrStatsMonitoringDoc.TYPE));
+            assertThat(document.getId(), nullValue());
+            assertThat(document.status(), is(status.status()));
+        }
+    }
+
+    private List<CcrStatsAction.StatsResponse> mockStatuses() {
+        final int count = randomIntBetween(1, 8);
+        final List<CcrStatsAction.StatsResponse> statuses = new ArrayList<>(count);
+
+        for (int i = 0; i < count; ++i) {
+            CcrStatsAction.StatsResponse statsResponse = mock(CcrStatsAction.StatsResponse.class);
+            ShardFollowNodeTaskStatus status = mock(ShardFollowNodeTaskStatus.class);
+            when(statsResponse.status()).thenReturn(status);
+            statuses.add(statsResponse);
+        }
+
+        return statuses;
+    }
+
+    private Settings ccrEnabledSettings() {
+        // since it's the default, we want to ensure we test both with/without it
+        return randomBoolean() ? Settings.EMPTY : Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), true).build();
+    }
+
+    private Settings ccrDisabledSettings() {
+        return Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), false).build();
+    }
+
+    private static CcrStatsAction.StatsRequest statsRequestEq(CcrStatsAction.StatsRequest expected) {
+        return argThat(new StatsRequestMatches(expected));
+    }
+
+    private static class StatsRequestMatches extends ArgumentMatcher<CcrStatsAction.StatsRequest> {
+
+        private final CcrStatsAction.StatsRequest expected;
+
+        private StatsRequestMatches(CcrStatsAction.StatsRequest expected) {
+            this.expected = expected;
+        }
+
+        @Override
+        public boolean matches(Object o) {
+            CcrStatsAction.StatsRequest actual = (CcrStatsAction.StatsRequest) o;
+            return Arrays.equals(expected.indices(), actual.indices());
+        }
+    }
+
+}

+ 175 - 0
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java

@@ -0,0 +1,175 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.monitoring.collector.ccr;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
+import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasToString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Mockito.mock;
+
+public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase<CcrStatsMonitoringDoc> {
+
+    private ShardFollowNodeTaskStatus status;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        status = mock(ShardFollowNodeTaskStatus.class);
+    }
+
+    public void testConstructorStatusMustNotBeNull() {
+        final NullPointerException e =
+                expectThrows(NullPointerException.class, () -> new CcrStatsMonitoringDoc(cluster, timestamp, interval, node, null));
+        assertThat(e, hasToString(containsString("status")));
+    }
+
+    @Override
+    protected CcrStatsMonitoringDoc createMonitoringDoc(
+            final String cluster,
+            final long timestamp,
+            final long interval,
+            final MonitoringDoc.Node node,
+            final MonitoredSystem system,
+            final String type,
+            final String id) {
+        return new CcrStatsMonitoringDoc(cluster, timestamp, interval, node, status);
+    }
+
+    @Override
+    protected void assertMonitoringDoc(CcrStatsMonitoringDoc document) {
+        assertThat(document.getSystem(), is(MonitoredSystem.ES));
+        assertThat(document.getType(), is(CcrStatsMonitoringDoc.TYPE));
+        assertThat(document.getId(), nullValue());
+        assertThat(document.status(), is(status));
+    }
+
+    @Override
+    public void testToXContent() throws IOException {
+        final long timestamp = System.currentTimeMillis();
+        final long intervalMillis = System.currentTimeMillis();
+        final long nodeTimestamp = System.currentTimeMillis();
+        final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", nodeTimestamp);
+        // these random values do not need to be internally consistent, they are only for testing formatting
+        final int shardId = randomIntBetween(0, Integer.MAX_VALUE);
+        final long leaderGlobalCheckpoint = randomNonNegativeLong();
+        final long leaderMaxSeqNo = randomNonNegativeLong();
+        final long followerGlobalCheckpoint = randomNonNegativeLong();
+        final long followerMaxSeqNo = randomNonNegativeLong();
+        final long lastRequestedSeqNo = randomNonNegativeLong();
+        final int numberOfConcurrentReads = randomIntBetween(1, Integer.MAX_VALUE);
+        final int numberOfConcurrentWrites = randomIntBetween(1, Integer.MAX_VALUE);
+        final int numberOfQueuedWrites = randomIntBetween(0, Integer.MAX_VALUE);
+        final long mappingVersion = randomIntBetween(0, Integer.MAX_VALUE);
+        final long totalFetchTimeMillis = randomLongBetween(0, 4096);
+        final long numberOfSuccessfulFetches = randomNonNegativeLong();
+        final long numberOfFailedFetches = randomLongBetween(0, 8);
+        final long operationsReceived = randomNonNegativeLong();
+        final long totalTransferredBytes = randomNonNegativeLong();
+        final long totalIndexTimeMillis = randomNonNegativeLong();
+        final long numberOfSuccessfulBulkOperations = randomNonNegativeLong();
+        final long numberOfFailedBulkOperations = randomNonNegativeLong();
+        final long numberOfOperationsIndexed = randomNonNegativeLong();
+        final NavigableMap<Long, ElasticsearchException> fetchExceptions =
+                new TreeMap<>(Collections.singletonMap(randomNonNegativeLong(), new ElasticsearchException("shard is sad")));
+        final long timeSinceLastFetchMillis = randomNonNegativeLong();
+        final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus(
+                "cluster_alias:leader_index",
+                shardId,
+                leaderGlobalCheckpoint,
+                leaderMaxSeqNo,
+                followerGlobalCheckpoint,
+                followerMaxSeqNo,
+                lastRequestedSeqNo,
+                numberOfConcurrentReads,
+                numberOfConcurrentWrites,
+                numberOfQueuedWrites,
+                mappingVersion,
+                totalFetchTimeMillis,
+                numberOfSuccessfulFetches,
+                numberOfFailedFetches,
+                operationsReceived,
+                totalTransferredBytes,
+                totalIndexTimeMillis,
+                numberOfSuccessfulBulkOperations,
+                numberOfFailedBulkOperations,
+                numberOfOperationsIndexed,
+                fetchExceptions,
+                timeSinceLastFetchMillis);
+        final CcrStatsMonitoringDoc document = new CcrStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, status);
+        final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
+        assertThat(
+                xContent.utf8ToString(),
+                equalTo(
+                        "{"
+                                + "\"cluster_uuid\":\"_cluster\","
+                                + "\"timestamp\":\"" + new DateTime(timestamp, DateTimeZone.UTC).toString() + "\","
+                                + "\"interval_ms\":" + intervalMillis + ","
+                                + "\"type\":\"ccr_stats\","
+                                + "\"source_node\":{"
+                                        + "\"uuid\":\"_uuid\","
+                                        + "\"host\":\"_host\","
+                                        + "\"transport_address\":\"_addr\","
+                                        + "\"ip\":\"_ip\","
+                                        + "\"name\":\"_name\","
+                                        + "\"timestamp\":\"" + new DateTime(nodeTimestamp, DateTimeZone.UTC).toString() +  "\""
+                                + "},"
+                                + "\"ccr_stats\":{"
+                                        + "\"leader_index\":\"cluster_alias:leader_index\","
+                                        + "\"shard_id\":" + shardId + ","
+                                        + "\"leader_global_checkpoint\":" + leaderGlobalCheckpoint + ","
+                                        + "\"leader_max_seq_no\":" + leaderMaxSeqNo + ","
+                                        + "\"follower_global_checkpoint\":" + followerGlobalCheckpoint + ","
+                                        + "\"follower_max_seq_no\":" + followerMaxSeqNo + ","
+                                        + "\"last_requested_seq_no\":" + lastRequestedSeqNo + ","
+                                        + "\"number_of_concurrent_reads\":" + numberOfConcurrentReads + ","
+                                        + "\"number_of_concurrent_writes\":" + numberOfConcurrentWrites + ","
+                                        + "\"number_of_queued_writes\":" + numberOfQueuedWrites + ","
+                                        + "\"mapping_version\":" + mappingVersion + ","
+                                        + "\"total_fetch_time_millis\":" + totalFetchTimeMillis + ","
+                                        + "\"number_of_successful_fetches\":" + numberOfSuccessfulFetches + ","
+                                        + "\"number_of_failed_fetches\":" + numberOfFailedFetches + ","
+                                        + "\"operations_received\":" + operationsReceived + ","
+                                        + "\"total_transferred_bytes\":" + totalTransferredBytes + ","
+                                        + "\"total_index_time_millis\":" + totalIndexTimeMillis +","
+                                        + "\"number_of_successful_bulk_operations\":" + numberOfSuccessfulBulkOperations + ","
+                                        + "\"number_of_failed_bulk_operations\":" + numberOfFailedBulkOperations + ","
+                                        + "\"number_of_operations_indexed\":" + numberOfOperationsIndexed + ","
+                                        + "\"fetch_exceptions\":["
+                                                + "{"
+                                                        + "\"from_seq_no\":" + fetchExceptions.keySet().iterator().next() + ","
+                                                        + "\"exception\":{"
+                                                                + "\"type\":\"exception\","
+                                                                + "\"reason\":\"shard is sad\""
+                                                        + "}"
+                                                + "}"
+                                        + "],"
+                                        + "\"time_since_last_fetch_millis\":" + timeSinceLastFetchMillis
+                                + "}"
+                        + "}"));
+    }
+
+}