Browse Source

[CCR] Expose auto follow stats to monitoring (#33886)

Martijn van Groningen 7 years ago
parent
commit
793b2a94b4
14 changed files with 697 additions and 151 deletions
  1. 30 0
      x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java
  2. 30 1
      x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java
  3. 118 0
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AbstractCcrCollectorTestCase.java
  4. 163 0
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java
  5. 85 0
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrAutoFollowStatsCollectorTests.java
  6. 7 93
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java
  7. 20 15
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java
  8. 12 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java
  9. 32 0
      x-pack/plugin/core/src/main/resources/monitoring-es.json
  10. 3 0
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java
  11. 71 0
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/AbstractCcrCollector.java
  12. 47 0
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDoc.java
  13. 61 0
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrAutoFollowStatsCollector.java
  14. 18 42
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java

+ 30 - 0
x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java

@@ -152,6 +152,10 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
             verifyDocuments(adminClient(), allowedIndex, 5);
         });
         assertThat(indexExists(adminClient(), disallowedIndex), is(false));
+        assertBusy(() -> {
+            verifyCcrMonitoring(allowedIndex, allowedIndex);
+            verifyAutoFollowMonitoring();
+        });
 
         // Cleanup by deleting auto follow pattern and unfollowing:
         request = new Request("DELETE", "/_ccr/auto_follow/leader_cluster");
@@ -309,4 +313,30 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
         assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
     }
 
+    private static void verifyAutoFollowMonitoring() throws IOException {
+        Request request = new Request("GET", "/.monitoring-*/_search");
+        request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_auto_follow_stats\"}}}");
+        Map<String, ?> response;
+        try {
+            response = toMap(adminClient().performRequest(request));
+        } catch (ResponseException e) {
+            throw new AssertionError("error while searching", e);
+        }
+
+        int numberOfSuccessfulFollowIndices = 0;
+
+        List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
+        assertThat(hits.size(), greaterThanOrEqualTo(1));
+
+        for (int i = 0; i < hits.size(); i++) {
+            Map<?, ?> hit = (Map<?, ?>) hits.get(i);
+
+            int foundNumberOfOperationsReceived =
+                (int) XContentMapValues.extractValue("_source.ccr_auto_follow_stats.number_of_successful_follow_indices", hit);
+            numberOfSuccessfulFollowIndices = Math.max(numberOfSuccessfulFollowIndices, foundNumberOfOperationsReceived);
+        }
+
+        assertThat(numberOfSuccessfulFollowIndices, greaterThanOrEqualTo(1));
+    }
+
 }

+ 30 - 1
x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java

@@ -125,7 +125,10 @@ public class FollowIndexIT extends ESRestTestCase {
             ensureYellow("logs-20190101");
             verifyDocuments("logs-20190101", 5);
         });
-        assertBusy(() -> verifyCcrMonitoring("logs-20190101", "logs-20190101"));
+        assertBusy(() -> {
+            verifyCcrMonitoring("logs-20190101", "logs-20190101");
+            verifyAutoFollowMonitoring();
+        });
     }
 
     private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
@@ -213,6 +216,32 @@ public class FollowIndexIT extends ESRestTestCase {
         assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
     }
 
+    private static void verifyAutoFollowMonitoring() throws IOException {
+        Request request = new Request("GET", "/.monitoring-*/_search");
+        request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_auto_follow_stats\"}}}");
+        Map<String, ?> response;
+        try {
+            response = toMap(client().performRequest(request));
+        } catch (ResponseException e) {
+            throw new AssertionError("error while searching", e);
+        }
+
+        int numberOfSuccessfulFollowIndices = 0;
+
+        List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
+        assertThat(hits.size(), greaterThanOrEqualTo(1));
+
+        for (int i = 0; i < hits.size(); i++) {
+            Map<?, ?> hit = (Map<?, ?>) hits.get(i);
+
+            int foundNumberOfOperationsReceived =
+                (int) XContentMapValues.extractValue("_source.ccr_auto_follow_stats.number_of_successful_follow_indices", hit);
+            numberOfSuccessfulFollowIndices = Math.max(numberOfSuccessfulFollowIndices, foundNumberOfOperationsReceived);
+        }
+
+        assertThat(numberOfSuccessfulFollowIndices, greaterThanOrEqualTo(1));
+    }
+
     private static Map<String, Object> toMap(Response response) throws IOException {
         return toMap(EntityUtils.toString(response.getEntity()));
     }

+ 118 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AbstractCcrCollectorTestCase.java

@@ -0,0 +1,118 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.monitoring.collector.ccr;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase;
+
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public abstract class AbstractCcrCollectorTestCase extends BaseCollectorTestCase {
+
+    public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
+        final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
+        final boolean ccrAllowed = randomBoolean();
+        final boolean isElectedMaster = randomBoolean();
+        whenLocalNodeElectedMaster(isElectedMaster);
+
+        // this controls the blockage
+        when(licenseState.isMonitoringAllowed()).thenReturn(false);
+        when(licenseState.isCcrAllowed()).thenReturn(ccrAllowed);
+
+        final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
+
+        assertThat(collector.shouldCollect(isElectedMaster), is(false));
+        if (isElectedMaster) {
+            verify(licenseState).isMonitoringAllowed();
+        }
+    }
+
+    public void testShouldCollectReturnsFalseIfNotMaster() {
+        // regardless of CCR being enabled
+        final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
+
+        when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
+        when(licenseState.isCcrAllowed()).thenReturn(randomBoolean());
+        // this controls the blockage
+        final boolean isElectedMaster = false;
+
+        final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
+
+        assertThat(collector.shouldCollect(isElectedMaster), is(false));
+    }
+
+    public void testShouldCollectReturnsFalseIfCCRIsDisabled() {
+        // this is controls the blockage
+        final Settings settings = ccrDisabledSettings();
+
+        when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
+        when(licenseState.isCcrAllowed()).thenReturn(randomBoolean());
+
+        final boolean isElectedMaster = randomBoolean();
+        whenLocalNodeElectedMaster(isElectedMaster);
+
+        final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
+
+        assertThat(collector.shouldCollect(isElectedMaster), is(false));
+
+        if (isElectedMaster) {
+            verify(licenseState).isMonitoringAllowed();
+        }
+    }
+
+    public void testShouldCollectReturnsFalseIfCCRIsNotAllowed() {
+        final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
+
+        when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
+        // this is controls the blockage
+        when(licenseState.isCcrAllowed()).thenReturn(false);
+        final boolean isElectedMaster = randomBoolean();
+        whenLocalNodeElectedMaster(isElectedMaster);
+
+        final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
+
+        assertThat(collector.shouldCollect(isElectedMaster), is(false));
+
+        if (isElectedMaster) {
+            verify(licenseState).isMonitoringAllowed();
+        }
+    }
+
+    public void testShouldCollectReturnsTrue() {
+        final Settings settings = ccrEnabledSettings();
+
+        when(licenseState.isMonitoringAllowed()).thenReturn(true);
+        when(licenseState.isCcrAllowed()).thenReturn(true);
+        final boolean isElectedMaster = true;
+
+        final AbstractCcrCollector collector = createCollector(settings, clusterService, licenseState, client);
+
+        assertThat(collector.shouldCollect(isElectedMaster), is(true));
+
+        verify(licenseState).isMonitoringAllowed();
+    }
+
+    abstract AbstractCcrCollector createCollector(Settings settings,
+                                                  ClusterService clusterService,
+                                                  XPackLicenseState licenseState,
+                                                  Client client);
+
+    private Settings ccrEnabledSettings() {
+        // since it's the default, we want to ensure we test both with/without it
+        return randomBoolean() ? Settings.EMPTY : Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), true).build();
+    }
+
+    private Settings ccrDisabledSettings() {
+        return Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), false).build();
+    }
+
+}

+ 163 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java

@@ -0,0 +1,163 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.monitoring.collector.ccr;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
+import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
+import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class AutoFollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<AutoFollowStatsMonitoringDoc> {
+
+    private AutoFollowStats autoFollowStats;
+
+    @Before
+    public void instantiateAutoFollowStats() {
+        autoFollowStats = new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
+            Collections.emptyNavigableMap());
+    }
+
+    @Override
+    protected AutoFollowStatsMonitoringDoc createMonitoringDoc(String cluster,
+                                                               long timestamp,
+                                                               long interval,
+                                                               MonitoringDoc.Node node,
+                                                               MonitoredSystem system,
+                                                               String type,
+                                                               String id) {
+        return new AutoFollowStatsMonitoringDoc(cluster, timestamp, interval, node, autoFollowStats);
+    }
+
+    @Override
+    protected void assertMonitoringDoc(AutoFollowStatsMonitoringDoc document) {
+        assertThat(document.getSystem(), is(MonitoredSystem.ES));
+        assertThat(document.getType(), is(AutoFollowStatsMonitoringDoc.TYPE));
+        assertThat(document.getId(), nullValue());
+        assertThat(document.stats(), is(autoFollowStats));
+    }
+
+    @Override
+    public void testToXContent() throws IOException {
+        final long timestamp = System.currentTimeMillis();
+        final long intervalMillis = System.currentTimeMillis();
+        final long nodeTimestamp = System.currentTimeMillis();
+        final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", nodeTimestamp);
+
+        final NavigableMap<String, ElasticsearchException> recentAutoFollowExceptions =
+            new TreeMap<>(Collections.singletonMap(
+                randomAlphaOfLength(4),
+                new ElasticsearchException("cannot follow index")));
+        final AutoFollowStats autoFollowStats =
+            new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), recentAutoFollowExceptions);
+
+        final AutoFollowStatsMonitoringDoc document =
+            new AutoFollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, autoFollowStats);
+        final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
+        assertThat(
+            xContent.utf8ToString(),
+            equalTo(
+                "{"
+                    + "\"cluster_uuid\":\"_cluster\","
+                    + "\"timestamp\":\"" + new DateTime(timestamp, DateTimeZone.UTC).toString() + "\","
+                    + "\"interval_ms\":" + intervalMillis + ","
+                    + "\"type\":\"ccr_auto_follow_stats\","
+                    + "\"source_node\":{"
+                        + "\"uuid\":\"_uuid\","
+                        + "\"host\":\"_host\","
+                        + "\"transport_address\":\"_addr\","
+                        + "\"ip\":\"_ip\","
+                        + "\"name\":\"_name\","
+                        + "\"timestamp\":\"" + new DateTime(nodeTimestamp, DateTimeZone.UTC).toString() +  "\""
+                    + "},"
+                    + "\"ccr_auto_follow_stats\":{"
+                        + "\"number_of_failed_follow_indices\":" + autoFollowStats.getNumberOfFailedFollowIndices() + ","
+                        + "\"number_of_failed_remote_cluster_state_requests\":" +
+                            autoFollowStats.getNumberOfFailedRemoteClusterStateRequests() + ","
+                        + "\"number_of_successful_follow_indices\":" + autoFollowStats.getNumberOfSuccessfulFollowIndices() + ","
+                        + "\"recent_auto_follow_errors\":["
+                            + "{"
+                                + "\"leader_index\":\"" + recentAutoFollowExceptions.keySet().iterator().next() + "\","
+                                + "\"auto_follow_exception\":{"
+                                    + "\"type\":\"exception\","
+                                    + "\"reason\":\"cannot follow index\""
+                                + "}"
+                            + "}"
+                        + "]"
+                    + "}"
+            + "}"));
+    }
+
+    public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
+        final NavigableMap<String, ElasticsearchException> fetchExceptions =
+            new TreeMap<>(Collections.singletonMap("leader_index", new ElasticsearchException("cannot follow index")));
+        final AutoFollowStats status = new AutoFollowStats(1, 0, 2, fetchExceptions);
+        XContentBuilder builder = jsonBuilder();
+        builder.value(status);
+        Map<String, Object> serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);
+
+        Map<String, Object> template =
+            XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
+        Map<?, ?> autoFollowStatsMapping =
+            (Map<?, ?>) XContentMapValues.extractValue("mappings.doc.properties.ccr_auto_follow_stats.properties", template);
+
+        assertThat(serializedStatus.size(), equalTo(autoFollowStatsMapping.size()));
+        for (Map.Entry<String, Object> entry : serializedStatus.entrySet()) {
+            String fieldName = entry.getKey();
+            Map<?, ?> fieldMapping = (Map<?, ?>) autoFollowStatsMapping.get(fieldName);
+            assertThat(fieldMapping, notNullValue());
+
+            Object fieldValue = entry.getValue();
+            String fieldType = (String) fieldMapping.get("type");
+            if (fieldValue instanceof Long || fieldValue instanceof Integer) {
+                assertThat("expected long field type for field [" + fieldName + "]", fieldType,
+                    anyOf(equalTo("long"), equalTo("integer")));
+            } else if (fieldValue instanceof String) {
+                assertThat("expected keyword field type for field [" + fieldName + "]", fieldType,
+                    anyOf(equalTo("keyword"), equalTo("text")));
+            } else {
+                // Manual test specific object fields and if not just fail:
+                if (fieldName.equals("recent_auto_follow_errors")) {
+                    assertThat(fieldType, equalTo("nested"));
+                    assertThat(((Map<?, ?>) fieldMapping.get("properties")).size(), equalTo(2));
+                    assertThat(XContentMapValues.extractValue("properties.leader_index.type", fieldMapping), equalTo("keyword"));
+                    assertThat(XContentMapValues.extractValue("properties.auto_follow_exception.type", fieldMapping), equalTo("object"));
+
+                    Map<?, ?> exceptionFieldMapping =
+                        (Map<?, ?>) XContentMapValues.extractValue("properties.auto_follow_exception.properties", fieldMapping);
+                    assertThat(exceptionFieldMapping.size(), equalTo(2));
+                    assertThat(XContentMapValues.extractValue("type.type", exceptionFieldMapping), equalTo("keyword"));
+                    assertThat(XContentMapValues.extractValue("reason.type", exceptionFieldMapping), equalTo("text"));
+                } else {
+                    fail("unexpected field value type [" + fieldValue.getClass() + "] for field [" + fieldName + "]");
+                }
+            }
+        }
+    }
+}

+ 85 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrAutoFollowStatsCollectorTests.java

@@ -0,0 +1,85 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.monitoring.collector.ccr;
+
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
+import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
+import org.elasticsearch.xpack.core.ccr.client.CcrClient;
+import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+
+import java.util.Collection;
+
+import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CcrAutoFollowStatsCollectorTests extends AbstractCcrCollectorTestCase {
+
+    @Override
+    AbstractCcrCollector createCollector(Settings settings, ClusterService clusterService, XPackLicenseState licenseState, Client client) {
+        return new CcrAutoFollowStatsCollector(settings, clusterService, licenseState, client);
+    }
+
+    public void testDoCollect() throws Exception {
+        final String clusterUuid = randomAlphaOfLength(5);
+        whenClusterStateWithUUID(clusterUuid);
+
+        final MonitoringDoc.Node node = randomMonitoringNode(random());
+        final CcrClient client = mock(CcrClient.class);
+        final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+
+        final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120));
+        withCollectionTimeout(CcrAutoFollowStatsCollector.CCR_AUTO_FOLLOW_STATS_TIMEOUT, timeout);
+
+        final CcrAutoFollowStatsCollector collector =
+            new CcrAutoFollowStatsCollector(Settings.EMPTY, clusterService, licenseState, client, threadContext);
+        assertEquals(timeout, collector.getCollectionTimeout());
+
+        final AutoFollowStats autoFollowStats = mock(AutoFollowStats.class);
+
+        @SuppressWarnings("unchecked")
+        final ActionFuture<AutoFollowStatsAction.Response> future = (ActionFuture<AutoFollowStatsAction.Response>)mock(ActionFuture.class);
+        final AutoFollowStatsAction.Response response = new AutoFollowStatsAction.Response(autoFollowStats);
+
+        when(client.autoFollowStats(any())).thenReturn(future);
+        when(future.actionGet(timeout)).thenReturn(response);
+
+        final long interval = randomNonNegativeLong();
+
+        final Collection<MonitoringDoc> documents = collector.doCollect(node, interval, clusterState);
+        verify(clusterState).metaData();
+        verify(metaData).clusterUUID();
+
+        assertThat(documents, hasSize(1));
+        final AutoFollowStatsMonitoringDoc document = (AutoFollowStatsMonitoringDoc) documents.iterator().next();
+
+        assertThat(document.getCluster(), is(clusterUuid));
+        assertThat(document.getTimestamp(), greaterThan(0L));
+        assertThat(document.getIntervalMillis(), equalTo(interval));
+        assertThat(document.getNode(), equalTo(node));
+        assertThat(document.getSystem(), is(MonitoredSystem.ES));
+        assertThat(document.getType(), is(AutoFollowStatsMonitoringDoc.TYPE));
+        assertThat(document.getId(), nullValue());
+        assertThat(document.stats(), is(autoFollowStats));
+    }
+
+}

+ 7 - 93
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java

@@ -7,17 +7,18 @@
 package org.elasticsearch.xpack.monitoring.collector.ccr;
 
 import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
 import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
 import org.elasticsearch.xpack.core.ccr.client.CcrClient;
 import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
 import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
-import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase;
 import org.mockito.ArgumentMatcher;
 
 import java.util.ArrayList;
@@ -38,89 +39,11 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-public class CcrStatsCollectorTests extends BaseCollectorTestCase {
+public class CcrStatsCollectorTests extends AbstractCcrCollectorTestCase {
 
-    public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() {
-        final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
-        final boolean ccrAllowed = randomBoolean();
-        final boolean isElectedMaster = randomBoolean();
-        whenLocalNodeElectedMaster(isElectedMaster);
-
-        // this controls the blockage
-        when(licenseState.isMonitoringAllowed()).thenReturn(false);
-        when(licenseState.isCcrAllowed()).thenReturn(ccrAllowed);
-
-        final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client);
-
-        assertThat(collector.shouldCollect(isElectedMaster), is(false));
-        if (isElectedMaster) {
-            verify(licenseState).isMonitoringAllowed();
-        }
-    }
-
-    public void testShouldCollectReturnsFalseIfNotMaster() {
-        // regardless of CCR being enabled
-        final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
-
-        when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
-        when(licenseState.isCcrAllowed()).thenReturn(randomBoolean());
-        // this controls the blockage
-        final boolean isElectedMaster = false;
-
-        final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client);
-
-        assertThat(collector.shouldCollect(isElectedMaster), is(false));
-    }
-
-    public void testShouldCollectReturnsFalseIfCCRIsDisabled() {
-        // this is controls the blockage
-        final Settings settings = ccrDisabledSettings();
-
-        when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
-        when(licenseState.isCcrAllowed()).thenReturn(randomBoolean());
-
-        final boolean isElectedMaster = randomBoolean();
-        whenLocalNodeElectedMaster(isElectedMaster);
-
-        final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client);
-
-        assertThat(collector.shouldCollect(isElectedMaster), is(false));
-
-        if (isElectedMaster) {
-            verify(licenseState).isMonitoringAllowed();
-        }
-    }
-
-    public void testShouldCollectReturnsFalseIfCCRIsNotAllowed() {
-        final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings());
-
-        when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean());
-        // this is controls the blockage
-        when(licenseState.isCcrAllowed()).thenReturn(false);
-        final boolean isElectedMaster = randomBoolean();
-        whenLocalNodeElectedMaster(isElectedMaster);
-
-        final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client);
-
-        assertThat(collector.shouldCollect(isElectedMaster), is(false));
-
-        if (isElectedMaster) {
-            verify(licenseState).isMonitoringAllowed();
-        }
-    }
-
-    public void testShouldCollectReturnsTrue() {
-        final Settings settings = ccrEnabledSettings();
-
-        when(licenseState.isMonitoringAllowed()).thenReturn(true);
-        when(licenseState.isCcrAllowed()).thenReturn(true);
-        final boolean isElectedMaster = true;
-
-        final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client);
-
-        assertThat(collector.shouldCollect(isElectedMaster), is(true));
-
-        verify(licenseState).isMonitoringAllowed();
+    @Override
+    AbstractCcrCollector createCollector(Settings settings, ClusterService clusterService, XPackLicenseState licenseState, Client client) {
+        return new CcrStatsCollector(settings, clusterService, licenseState, client);
     }
 
     public void testDoCollect() throws Exception {
@@ -186,15 +109,6 @@ public class CcrStatsCollectorTests extends BaseCollectorTestCase {
         return statuses;
     }
 
-    private Settings ccrEnabledSettings() {
-        // since it's the default, we want to ensure we test both with/without it
-        return randomBoolean() ? Settings.EMPTY : Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), true).build();
-    }
-
-    private Settings ccrDisabledSettings() {
-        return Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), false).build();
-    }
-
     private static CcrStatsAction.StatsRequest statsRequestEq(CcrStatsAction.StatsRequest expected) {
         return argThat(new StatsRequestMatches(expected));
     }

+ 20 - 15
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java

@@ -121,28 +121,33 @@ public class AutoFollowStats implements Writeable, ToXContentObject {
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
         {
-            builder.field(NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfFailedFollowIndices);
-            builder.field(NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS.getPreferredName(), numberOfFailedRemoteClusterStateRequests);
-            builder.field(NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfSuccessfulFollowIndices);
-            builder.startArray(RECENT_AUTO_FOLLOW_ERRORS.getPreferredName());
-            {
-                for (final Map.Entry<String, ElasticsearchException> entry : recentAutoFollowErrors.entrySet()) {
+            toXContentFragment(builder, params);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    public XContentBuilder toXContentFragment(final XContentBuilder builder, final Params params) throws IOException {
+        builder.field(NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfFailedFollowIndices);
+        builder.field(NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS.getPreferredName(), numberOfFailedRemoteClusterStateRequests);
+        builder.field(NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED.getPreferredName(), numberOfSuccessfulFollowIndices);
+        builder.startArray(RECENT_AUTO_FOLLOW_ERRORS.getPreferredName());
+        {
+            for (final Map.Entry<String, ElasticsearchException> entry : recentAutoFollowErrors.entrySet()) {
+                builder.startObject();
+                {
+                    builder.field(LEADER_INDEX.getPreferredName(), entry.getKey());
+                    builder.field(AUTO_FOLLOW_EXCEPTION.getPreferredName());
                     builder.startObject();
                     {
-                        builder.field(LEADER_INDEX.getPreferredName(), entry.getKey());
-                        builder.field(AUTO_FOLLOW_EXCEPTION.getPreferredName());
-                        builder.startObject();
-                        {
-                            ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue());
-                        }
-                        builder.endObject();
+                        ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue());
                     }
                     builder.endObject();
                 }
+                builder.endObject();
             }
-            builder.endArray();
         }
-        builder.endObject();
+        builder.endArray();
         return builder;
     }
 

+ 12 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java

@@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.ElasticsearchClient;
+import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
 import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
 import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
 import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
@@ -63,6 +64,17 @@ public class CcrClient {
         return listener;
     }
 
+    public void autoFollowStats(final AutoFollowStatsAction.Request request,
+                                final ActionListener<AutoFollowStatsAction.Response> listener) {
+        client.execute(AutoFollowStatsAction.INSTANCE, request, listener);
+    }
+
+    public ActionFuture<AutoFollowStatsAction.Response> autoFollowStats(final AutoFollowStatsAction.Request request) {
+        final PlainActionFuture<AutoFollowStatsAction.Response> listener = PlainActionFuture.newFuture();
+        autoFollowStats(request, listener);
+        return listener;
+    }
+
     public void unfollow(final UnfollowIndexAction.Request request, final ActionListener<AcknowledgedResponse> listener) {
         client.execute(UnfollowIndexAction.INSTANCE, request, listener);
     }

+ 32 - 0
x-pack/plugin/core/src/main/resources/monitoring-es.json

@@ -1008,6 +1008,38 @@
               "type": "long"
             }
           }
+        },
+        "ccr_auto_follow_stats" : {
+          "properties": {
+            "number_of_failed_follow_indices": {
+              "type": "long"
+            },
+            "number_of_failed_remote_cluster_state_requests": {
+              "type": "long"
+            },
+            "number_of_successful_follow_indices": {
+              "type": "long"
+            },
+            "recent_auto_follow_errors": {
+              "type": "nested",
+              "properties": {
+                "leader_index": {
+                  "type": "keyword"
+                },
+                "auto_follow_exception": {
+                  "type": "object",
+                  "properties": {
+                    "type": {
+                      "type": "keyword"
+                    },
+                    "reason": {
+                      "type": "text"
+                    }
+                  }
+                }
+              }
+            }
+          }
         }
       }
     }

+ 3 - 0
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java

@@ -39,6 +39,7 @@ import org.elasticsearch.xpack.core.ssl.SSLService;
 import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction;
 import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
 import org.elasticsearch.xpack.monitoring.collector.Collector;
+import org.elasticsearch.xpack.monitoring.collector.ccr.CcrAutoFollowStatsCollector;
 import org.elasticsearch.xpack.monitoring.collector.ccr.CcrStatsCollector;
 import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector;
 import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryCollector;
@@ -144,6 +145,7 @@ public class Monitoring extends Plugin implements ActionPlugin {
         collectors.add(new IndexRecoveryCollector(settings, clusterService, getLicenseState(), client));
         collectors.add(new JobStatsCollector(settings, clusterService, getLicenseState(), client));
         collectors.add(new CcrStatsCollector(settings, clusterService, getLicenseState(), client));
+        collectors.add(new CcrAutoFollowStatsCollector(settings, clusterService, getLicenseState(), client));
 
         final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, collectors, exporters);
 
@@ -183,6 +185,7 @@ public class Monitoring extends Plugin implements ActionPlugin {
         settings.add(IndexStatsCollector.INDEX_STATS_TIMEOUT);
         settings.add(JobStatsCollector.JOB_STATS_TIMEOUT);
         settings.add(CcrStatsCollector.CCR_STATS_TIMEOUT);
+        settings.add(CcrAutoFollowStatsCollector.CCR_AUTO_FOLLOW_STATS_TIMEOUT);
         settings.add(NodeStatsCollector.NODE_STATS_TIMEOUT);
         settings.addAll(Exporters.getSettings());
         return Collections.unmodifiableList(settings);

+ 71 - 0
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/AbstractCcrCollector.java

@@ -0,0 +1,71 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.monitoring.collector.ccr;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.ccr.client.CcrClient;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+import org.elasticsearch.xpack.monitoring.collector.Collector;
+
+import java.util.Collection;
+
+import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
+import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
+import static org.elasticsearch.xpack.monitoring.collector.ccr.CcrStatsMonitoringDoc.TYPE;
+
+public abstract class AbstractCcrCollector extends Collector {
+
+    private final ThreadContext threadContext;
+    final CcrClient ccrClient;
+
+    AbstractCcrCollector(
+            final Settings settings,
+            final ClusterService clusterService,
+            final Setting<TimeValue> timeoutSetting,
+            final XPackLicenseState licenseState,
+            final CcrClient ccrClient,
+            final ThreadContext threadContext) {
+        super(settings, TYPE, clusterService, timeoutSetting, licenseState);
+        this.ccrClient = ccrClient;
+        this.threadContext = threadContext;
+    }
+
+    @Override
+    protected boolean shouldCollect(final boolean isElectedMaster) {
+        // this can only run when monitoring is allowed and CCR is enabled and allowed, but also only on the elected master node
+        return isElectedMaster
+                && super.shouldCollect(isElectedMaster)
+                && XPackSettings.CCR_ENABLED_SETTING.get(settings)
+                && licenseState.isCcrAllowed();
+    }
+
+
+    @Override
+    protected Collection<MonitoringDoc> doCollect(
+            final MonitoringDoc.Node node,
+            final long interval,
+            final ClusterState clusterState) throws Exception {
+        try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, MONITORING_ORIGIN)) {
+            final long timestamp = timestamp();
+            final String clusterUuid = clusterUuid(clusterState);
+            return innerDoCollect(timestamp, clusterUuid, interval, node);
+        }
+    }
+
+    abstract Collection<MonitoringDoc> innerDoCollect(
+            long timestamp,
+            String clusterUuid,
+            long interval,
+            MonitoringDoc.Node node) throws Exception;
+}

+ 47 - 0
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDoc.java

@@ -0,0 +1,47 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.monitoring.collector.ccr;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
+import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class AutoFollowStatsMonitoringDoc extends MonitoringDoc {
+
+    public static final String TYPE = "ccr_auto_follow_stats";
+
+    private final AutoFollowStats stats;
+
+    public AutoFollowStats stats() {
+        return stats;
+    }
+
+    public AutoFollowStatsMonitoringDoc(
+            final String cluster,
+            final long timestamp,
+            final long intervalMillis,
+            final Node node,
+            final AutoFollowStats stats) {
+        super(cluster, timestamp, intervalMillis, node, MonitoredSystem.ES, TYPE, null);
+        this.stats = Objects.requireNonNull(stats, "stats");
+    }
+
+
+    @Override
+    protected void innerToXContent(final XContentBuilder builder, final Params params) throws IOException {
+        builder.startObject(TYPE);
+        {
+            stats.toXContentFragment(builder, params);
+        }
+        builder.endObject();
+    }
+
+}

+ 61 - 0
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrAutoFollowStatsCollector.java

@@ -0,0 +1,61 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.monitoring.collector.ccr;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.xpack.core.XPackClient;
+import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
+import org.elasticsearch.xpack.core.ccr.client.CcrClient;
+import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public final class CcrAutoFollowStatsCollector extends AbstractCcrCollector {
+
+    public static final Setting<TimeValue> CCR_AUTO_FOLLOW_STATS_TIMEOUT = collectionTimeoutSetting("ccr.auto_follow.stats.timeout");
+
+    public CcrAutoFollowStatsCollector(
+            final Settings settings,
+            final ClusterService clusterService,
+            final XPackLicenseState licenseState,
+            final Client client) {
+        super(settings, clusterService, CCR_AUTO_FOLLOW_STATS_TIMEOUT, licenseState, new XPackClient(client).ccr(),
+            client.threadPool().getThreadContext());
+    }
+
+    CcrAutoFollowStatsCollector(
+            final Settings settings,
+            final ClusterService clusterService,
+            final XPackLicenseState licenseState,
+            final CcrClient ccrClient,
+            final ThreadContext threadContext) {
+        super(settings, clusterService, CCR_AUTO_FOLLOW_STATS_TIMEOUT, licenseState, ccrClient, threadContext);
+    }
+
+    @Override
+    Collection<MonitoringDoc> innerDoCollect(
+        long timestamp,
+        String clusterUuid,
+        long interval,
+        MonitoringDoc.Node node) throws Exception {
+
+        final AutoFollowStatsAction.Request request = new AutoFollowStatsAction.Request();
+        final AutoFollowStatsAction.Response response = ccrClient.autoFollowStats(request).actionGet(getCollectionTimeout());
+
+        final AutoFollowStatsMonitoringDoc doc =
+            new AutoFollowStatsMonitoringDoc(clusterUuid, timestamp, interval, node, response.getStats());
+        return Collections.singletonList(doc);
+    }
+
+}

+ 18 - 42
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java

@@ -8,7 +8,6 @@ package org.elasticsearch.xpack.monitoring.collector.ccr;
 
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
@@ -16,32 +15,24 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.xpack.core.XPackClient;
-import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
 import org.elasticsearch.xpack.core.ccr.client.CcrClient;
 import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
-import org.elasticsearch.xpack.monitoring.collector.Collector;
 
 import java.util.Collection;
 import java.util.stream.Collectors;
 
-import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
-import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
-import static org.elasticsearch.xpack.monitoring.collector.ccr.CcrStatsMonitoringDoc.TYPE;
-
-public class CcrStatsCollector extends Collector {
+public final class CcrStatsCollector extends AbstractCcrCollector {
 
     public static final Setting<TimeValue> CCR_STATS_TIMEOUT = collectionTimeoutSetting("ccr.stats.timeout");
 
-    private final ThreadContext threadContext;
-    private final CcrClient ccrClient;
-
     public CcrStatsCollector(
             final Settings settings,
             final ClusterService clusterService,
             final XPackLicenseState licenseState,
             final Client client) {
-        this(settings, clusterService, licenseState, new XPackClient(client).ccr(), client.threadPool().getThreadContext());
+        super(settings, clusterService, CCR_STATS_TIMEOUT, licenseState, new XPackClient(client).ccr(),
+            client.threadPool().getThreadContext());
     }
 
     CcrStatsCollector(
@@ -50,41 +41,26 @@ public class CcrStatsCollector extends Collector {
             final XPackLicenseState licenseState,
             final CcrClient ccrClient,
             final ThreadContext threadContext) {
-        super(settings, TYPE, clusterService, CCR_STATS_TIMEOUT, licenseState);
-        this.ccrClient = ccrClient;
-        this.threadContext = threadContext;
-    }
-
-    @Override
-    protected boolean shouldCollect(final boolean isElectedMaster) {
-        // this can only run when monitoring is allowed and CCR is enabled and allowed, but also only on the elected master node
-        return isElectedMaster
-                && super.shouldCollect(isElectedMaster)
-                && XPackSettings.CCR_ENABLED_SETTING.get(settings)
-                && licenseState.isCcrAllowed();
+        super(settings, clusterService, CCR_STATS_TIMEOUT, licenseState, ccrClient, threadContext);
     }
 
-
     @Override
-    protected Collection<MonitoringDoc> doCollect(
-            final MonitoringDoc.Node node,
-            final long interval,
-            final ClusterState clusterState) throws Exception {
-        try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, MONITORING_ORIGIN)) {
-            final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest();
-            request.setIndices(getCollectionIndices());
-            request.setIndicesOptions(IndicesOptions.lenientExpandOpen());
-            final CcrStatsAction.StatsResponses responses = ccrClient.stats(request).actionGet(getCollectionTimeout());
+    Collection<MonitoringDoc> innerDoCollect(
+        long timestamp,
+        String clusterUuid,
+        long interval,
+        MonitoringDoc.Node node) throws Exception {
 
-            final long timestamp = timestamp();
-            final String clusterUuid = clusterUuid(clusterState);
+        final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest();
+        request.setIndices(getCollectionIndices());
+        request.setIndicesOptions(IndicesOptions.lenientExpandOpen());
+        final CcrStatsAction.StatsResponses responses = ccrClient.stats(request).actionGet(getCollectionTimeout());
 
-            return responses
-                    .getStatsResponses()
-                    .stream()
-                    .map(stats -> new CcrStatsMonitoringDoc(clusterUuid, timestamp, interval, node, stats.status()))
-                    .collect(Collectors.toList());
-        }
+        return responses
+            .getStatsResponses()
+            .stream()
+            .map(stats -> new CcrStatsMonitoringDoc(clusterUuid, timestamp, interval, node, stats.status()))
+            .collect(Collectors.toList());
     }
 
 }