Browse Source

Add license checks for auto-follow implementation (#33496)

This commit adds license checks for the auto-follow implementation. We
check the license on put auto-follow patterns, and then for every
coordination round we check that the local and remote clusters are
licensed for CCR. In the case of non-compliance, we skip coordination
yet continue to schedule follow-ups.
Jason Tedor 7 years ago
parent
commit
5a38c930fc

+ 32 - 1
test/framework/src/main/java/org/elasticsearch/test/MockLogAppender.java

@@ -85,7 +85,7 @@ public class MockLogAppender extends AbstractAppender {
 
         @Override
         public void match(LogEvent event) {
-            if (event.getLevel().equals(level) && event.getLoggerName().equals(logger)) {
+            if (event.getLevel().equals(level) && event.getLoggerName().equals(logger) && innerMatch(event)) {
                 if (Regex.isSimpleMatchPattern(message)) {
                     if (Regex.simpleMatch(message, event.getMessage().getFormattedMessage())) {
                         saw = true;
@@ -97,6 +97,11 @@ public class MockLogAppender extends AbstractAppender {
                 }
             }
         }
+
+        public boolean innerMatch(final LogEvent event) {
+            return true;
+        }
+
     }
 
     public static class UnseenEventExpectation extends AbstractEventExpectation {
@@ -123,6 +128,32 @@ public class MockLogAppender extends AbstractAppender {
         }
     }
 
+    public static class ExceptionSeenEventExpectation extends SeenEventExpectation {
+
+        private final Class<? extends Exception> clazz;
+        private final String exceptionMessage;
+
+        public ExceptionSeenEventExpectation(
+                final String name,
+                final String logger,
+                final Level level,
+                final String message,
+                final Class<? extends Exception> clazz,
+                final String exceptionMessage) {
+            super(name, logger, level, message);
+            this.clazz = clazz;
+            this.exceptionMessage = exceptionMessage;
+        }
+
+        @Override
+        public boolean innerMatch(final LogEvent event) {
+            return event.getThrown() != null
+                    && event.getThrown().getClass() == clazz
+                    && event.getThrown().getMessage().equals(exceptionMessage);
+        }
+
+    }
+
     public static class PatternSeenEventExcpectation implements LoggingExpectation {
 
         protected final String name;

+ 15 - 0
x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/build.gradle → x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/build.gradle

@@ -20,7 +20,20 @@ leaderClusterTestRunner {
     systemProperty 'tests.is_leader_cluster', 'true'
 }
 
+task writeJavaPolicy {
+    doLast {
+        final File javaPolicy = file("${buildDir}/tmp/java.policy")
+        javaPolicy.write(
+                [
+                        "grant {",
+                        "  permission java.io.FilePermission \"${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log\", \"read\";",
+                        "};"
+                ].join("\n"))
+    }
+}
+
 task followClusterTest(type: RestIntegTestTask) {}
+followClusterTest.dependsOn writeJavaPolicy
 
 followClusterTestCluster {
     dependsOn leaderClusterTestRunner
@@ -31,8 +44,10 @@ followClusterTestCluster {
 }
 
 followClusterTestRunner {
+    systemProperty 'java.security.policy', "file://${buildDir}/tmp/java.policy"
     systemProperty 'tests.is_leader_cluster', 'false'
     systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
+    systemProperty 'log', "${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log"
     finalizedBy 'leaderClusterTestCluster#stop'
 }
 

+ 41 - 3
x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java → x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java

@@ -9,11 +9,16 @@ package org.elasticsearch.xpack.ccr;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.common.Booleans;
+import org.elasticsearch.common.io.PathUtils;
 import org.elasticsearch.test.rest.ESRestTestCase;
 
+import java.nio.file.Files;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Locale;
 
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasToString;
 
 public class CcrMultiClusterLicenseIT extends ESRestTestCase {
@@ -29,7 +34,7 @@ public class CcrMultiClusterLicenseIT extends ESRestTestCase {
         if (runningAgainstLeaderCluster == false) {
             final Request request = new Request("POST", "/follower/_ccr/follow");
             request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
-            assertLicenseIncompatible(request);
+            assertNonCompliantLicense(request);
         }
     }
 
@@ -37,11 +42,44 @@ public class CcrMultiClusterLicenseIT extends ESRestTestCase {
         if (runningAgainstLeaderCluster == false) {
             final Request request = new Request("POST", "/follower/_ccr/create_and_follow");
             request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
-            assertLicenseIncompatible(request);
+            assertNonCompliantLicense(request);
         }
     }
 
-    private static void assertLicenseIncompatible(final Request request) {
+    public void testAutoFollow() throws Exception {
+        if (runningAgainstLeaderCluster == false) {
+            final Request request = new Request("PUT", "/_ccr/_auto_follow/leader_cluster");
+            request.setJsonEntity("{\"leader_index_patterns\":[\"*\"]}");
+            client().performRequest(request);
+
+            // parse the logs and ensure that the auto-coordinator skipped coordination on the leader cluster
+            assertBusy(() -> {
+                final List<String> lines = Files.readAllLines(PathUtils.get(System.getProperty("log")));
+
+                final Iterator<String> it = lines.iterator();
+
+                boolean warn = false;
+                while (it.hasNext()) {
+                    final String line = it.next();
+                    if (line.matches(".*\\[WARN\\s*\\]\\[o\\.e\\.x\\.c\\.a\\.AutoFollowCoordinator\\s*\\] \\[node-0\\] " +
+                            "failure occurred during auto-follower coordination")) {
+                        warn = true;
+                        break;
+                    }
+                }
+                assertTrue(warn);
+                assertTrue(it.hasNext());
+                final String lineAfterWarn = it.next();
+                assertThat(
+                        lineAfterWarn,
+                        equalTo("org.elasticsearch.ElasticsearchStatusException: " +
+                                "can not fetch remote cluster state as the remote cluster [leader_cluster] is not licensed for [ccr]; " +
+                                "the license mode [BASIC] on cluster [leader_cluster] does not enable [ccr]"));
+            });
+        }
+    }
+
+    private static void assertNonCompliantLicense(final Request request) {
         final ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request));
         final String expected = String.format(
                 Locale.ROOT,

+ 1 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

@@ -126,7 +126,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
 
         return Arrays.asList(
             ccrLicenseChecker,
-            new AutoFollowCoordinator(settings, client, threadPool, clusterService)
+            new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker)
         );
     }
 

+ 101 - 23
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

@@ -23,6 +23,7 @@ import java.util.Locale;
 import java.util.Objects;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * Encapsulates licensing checking for CCR.
@@ -58,14 +59,13 @@ public final class CcrLicenseChecker {
 
     /**
      * Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for
-     * license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@link ActionListener#onFailure(Exception)} method
-     * of the specified listener is invoked. Otherwise, the specified consumer is invoked with the leader index metadata fetched from the
-     * remote cluster.
+     * license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked.
+     * Otherwise, the specified consumer is invoked with the leader index metadata fetched from the remote cluster.
      *
      * @param client                      the client
      * @param clusterAlias                the remote cluster alias
      * @param leaderIndex                 the name of the leader index
-     * @param listener                    the listener
+     * @param onFailure                   the failure consumer
      * @param leaderIndexMetadataConsumer the leader index metadata consumer
      * @param <T>                         the type of response the listener is waiting for
      */
@@ -73,8 +73,75 @@ public final class CcrLicenseChecker {
             final Client client,
             final String clusterAlias,
             final String leaderIndex,
-            final ActionListener<T> listener,
+            final Consumer<Exception> onFailure,
             final Consumer<IndexMetaData> leaderIndexMetadataConsumer) {
+
+        final ClusterStateRequest request = new ClusterStateRequest();
+        request.clear();
+        request.metaData(true);
+        request.indices(leaderIndex);
+        checkRemoteClusterLicenseAndFetchClusterState(
+                client,
+                clusterAlias,
+                request,
+                onFailure,
+                leaderClusterState -> leaderIndexMetadataConsumer.accept(leaderClusterState.getMetaData().index(leaderIndex)),
+                licenseCheck -> indexMetadataNonCompliantRemoteLicense(leaderIndex, licenseCheck),
+                e -> indexMetadataUnknownRemoteLicense(leaderIndex, clusterAlias, e));
+    }
+
+    /**
+     * Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
+     * the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,
+     * the {@code onFailure} consumer is invoked. Otherwise, the specified consumer is invoked with the leader cluster state fetched from
+     * the remote cluster.
+     *
+     * @param client                     the client
+     * @param clusterAlias               the remote cluster alias
+     * @param request                    the cluster state request
+     * @param onFailure                  the failure consumer
+     * @param leaderClusterStateConsumer the leader cluster state consumer
+     * @param <T>                        the type of response the listener is waiting for
+     */
+    public <T> void checkRemoteClusterLicenseAndFetchClusterState(
+            final Client client,
+            final String clusterAlias,
+            final ClusterStateRequest request,
+            final Consumer<Exception> onFailure,
+            final Consumer<ClusterState> leaderClusterStateConsumer) {
+        checkRemoteClusterLicenseAndFetchClusterState(
+                client,
+                clusterAlias,
+                request,
+                onFailure,
+                leaderClusterStateConsumer,
+                CcrLicenseChecker::clusterStateNonCompliantRemoteLicense,
+                e -> clusterStateUnknownRemoteLicense(clusterAlias, e));
+    }
+
+    /**
+     * Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
+     * the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,
+     * the {@code onFailure} consumer is invoked. Otherwise, the specified consumer is invoked with the leader cluster state fetched from
+     * the remote cluster.
+     *
+     * @param client                     the client
+     * @param clusterAlias               the remote cluster alias
+     * @param request                    the cluster state request
+     * @param onFailure                  the failure consumer
+     * @param leaderClusterStateConsumer the leader cluster state consumer
+     * @param nonCompliantLicense        the supplier for when the license state of the remote cluster is non-compliant
+     * @param unknownLicense             the supplier for when the license state of the remote cluster is unknown due to failure
+     * @param <T>                        the type of response the listener is waiting for
+     */
+    private <T> void checkRemoteClusterLicenseAndFetchClusterState(
+            final Client client,
+            final String clusterAlias,
+            final ClusterStateRequest request,
+            final Consumer<Exception> onFailure,
+            final Consumer<ClusterState> leaderClusterStateConsumer,
+            final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
+            final Function<Exception, ElasticsearchStatusException> unknownLicense) {
         // we have to check the license on the remote cluster
         new RemoteClusterLicenseChecker(client, XPackLicenseState::isCcrAllowedForOperationMode).checkRemoteClusterLicenses(
                 Collections.singletonList(clusterAlias),
@@ -83,35 +150,25 @@ public final class CcrLicenseChecker {
                     @Override
                     public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
                         if (licenseCheck.isSuccess()) {
-                            final Client remoteClient = client.getRemoteClusterClient(clusterAlias);
-                            final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
-                            clusterStateRequest.clear();
-                            clusterStateRequest.metaData(true);
-                            clusterStateRequest.indices(leaderIndex);
-                            final ActionListener<ClusterStateResponse> clusterStateListener = ActionListener.wrap(
-                                    r -> {
-                                        final ClusterState remoteClusterState = r.getState();
-                                        final IndexMetaData leaderIndexMetadata =
-                                                remoteClusterState.getMetaData().index(leaderIndex);
-                                        leaderIndexMetadataConsumer.accept(leaderIndexMetadata);
-                                    },
-                                    listener::onFailure);
+                            final Client leaderClient = client.getRemoteClusterClient(clusterAlias);
+                            final ActionListener<ClusterStateResponse> clusterStateListener =
+                                    ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
                             // following an index in remote cluster, so use remote client to fetch leader index metadata
-                            remoteClient.admin().cluster().state(clusterStateRequest, clusterStateListener);
+                            leaderClient.admin().cluster().state(request, clusterStateListener);
                         } else {
-                            listener.onFailure(incompatibleRemoteLicense(leaderIndex, licenseCheck));
+                            onFailure.accept(nonCompliantLicense.apply(licenseCheck));
                         }
                     }
 
                     @Override
                     public void onFailure(final Exception e) {
-                        listener.onFailure(unknownRemoteLicense(leaderIndex, clusterAlias, e));
+                        onFailure.accept(unknownLicense.apply(e));
                     }
 
                 });
     }
 
-    private static ElasticsearchStatusException incompatibleRemoteLicense(
+    private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense(
             final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
         final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();
         final String message = String.format(
@@ -127,7 +184,21 @@ public final class CcrLicenseChecker {
         return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST);
     }
 
-    private static ElasticsearchStatusException unknownRemoteLicense(
+    private static ElasticsearchStatusException clusterStateNonCompliantRemoteLicense(
+            final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
+        final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();
+        final String message = String.format(
+                Locale.ROOT,
+                "can not fetch remote cluster state as the remote cluster [%s] is not licensed for [ccr]; %s",
+                clusterAlias,
+                RemoteClusterLicenseChecker.buildErrorMessage(
+                        "ccr",
+                        licenseCheck.remoteClusterLicenseInfo(),
+                        RemoteClusterLicenseChecker::isLicensePlatinumOrTrial));
+        return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST);
+    }
+
+    private static ElasticsearchStatusException indexMetadataUnknownRemoteLicense(
             final String leaderIndex, final String clusterAlias, final Exception cause) {
         final String message = String.format(
                 Locale.ROOT,
@@ -138,4 +209,11 @@ public final class CcrLicenseChecker {
         return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause);
     }
 
+    private static ElasticsearchStatusException clusterStateUnknownRemoteLicense(final String clusterAlias, final Exception cause) {
+        final String message = String.format(
+                Locale.ROOT,
+                "can not fetch remote cluster state as the license state of the remote cluster [%s] could not be determined", clusterAlias);
+        return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause);
+    }
+
 }

+ 47 - 29
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

@@ -21,7 +21,9 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.CountDown;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
 import org.elasticsearch.xpack.ccr.CcrSettings;
 import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
 import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
@@ -30,6 +32,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -47,22 +50,32 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
     private final TimeValue pollInterval;
     private final ThreadPool threadPool;
     private final ClusterService clusterService;
+    private final CcrLicenseChecker ccrLicenseChecker;
 
     private volatile boolean localNodeMaster = false;
 
-    public AutoFollowCoordinator(Settings settings,
-                                 Client client,
-                                 ThreadPool threadPool,
-                                 ClusterService clusterService) {
+    public AutoFollowCoordinator(
+            Settings settings,
+            Client client,
+            ThreadPool threadPool,
+            ClusterService clusterService,
+            CcrLicenseChecker ccrLicenseChecker) {
         this.client = client;
         this.threadPool = threadPool;
         this.clusterService = clusterService;
+        this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
 
         this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings);
         clusterService.addStateApplier(this);
     }
 
     private void doAutoFollow() {
+        if (ccrLicenseChecker.isCcrAllowed() == false) {
+            // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
+            LOGGER.warn("skipping auto-follower coordination", LicenseUtils.newComplianceException("ccr"));
+            threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
+            return;
+        }
         if (localNodeMaster == false) {
             return;
         }
@@ -80,23 +93,32 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
 
         Consumer<Exception> handler = e -> {
             if (e != null) {
-                LOGGER.warn("Failure occurred during auto following indices", e);
+                LOGGER.warn("failure occurred during auto-follower coordination", e);
             }
             threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
         };
-        AutoFollower operation = new AutoFollower(client, handler, followerClusterState) {
+        AutoFollower operation = new AutoFollower(handler, followerClusterState) {
 
             @Override
-            void getLeaderClusterState(Client leaderClient, BiConsumer<ClusterState, Exception> handler) {
-                ClusterStateRequest request = new ClusterStateRequest();
+            void getLeaderClusterState(final String leaderClusterAlias, final BiConsumer<ClusterState, Exception> handler) {
+                final ClusterStateRequest request = new ClusterStateRequest();
                 request.clear();
                 request.metaData(true);
-                leaderClient.admin().cluster().state(request,
-                    ActionListener.wrap(
-                        r -> handler.accept(r.getState(), null),
-                        e -> handler.accept(null, e)
-                    )
-                );
+
+                if ("_local_".equals(leaderClusterAlias)) {
+                    client.admin().cluster().state(
+                            request, ActionListener.wrap(r -> handler.accept(r.getState(), null), e -> handler.accept(null, e)));
+                } else {
+                    final Client leaderClient = client.getRemoteClusterClient(leaderClusterAlias);
+                    // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
+                    ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
+                            leaderClient,
+                            leaderClusterAlias,
+                            request,
+                            e -> handler.accept(null, e),
+                            leaderClusterState -> handler.accept(leaderClusterState, null));
+                }
+
             }
 
             @Override
@@ -143,7 +165,6 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
 
     abstract static class AutoFollower {
 
-        private final Client client;
         private final Consumer<Exception> handler;
         private final ClusterState followerClusterState;
         private final AutoFollowMetadata autoFollowMetadata;
@@ -151,8 +172,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
         private final CountDown autoFollowPatternsCountDown;
         private final AtomicReference<Exception> autoFollowPatternsErrorHolder = new AtomicReference<>();
 
-        AutoFollower(Client client, Consumer<Exception> handler, ClusterState followerClusterState) {
-            this.client = client;
+        AutoFollower(final Consumer<Exception> handler, final ClusterState followerClusterState) {
             this.handler = handler;
             this.followerClusterState = followerClusterState;
             this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
@@ -163,10 +183,9 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
             for (Map.Entry<String, AutoFollowPattern> entry : autoFollowMetadata.getPatterns().entrySet()) {
                 String clusterAlias = entry.getKey();
                 AutoFollowPattern autoFollowPattern = entry.getValue();
-                Client leaderClient = clusterAlias.equals("_local_") ? client : client.getRemoteClusterClient(clusterAlias);
                 List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
 
-                getLeaderClusterState(leaderClient, (leaderClusterState, e) -> {
+                getLeaderClusterState(clusterAlias, (leaderClusterState, e) -> {
                     if (leaderClusterState != null) {
                         assert e == null;
                         handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState);
@@ -289,18 +308,17 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
             };
         }
 
-        // abstract methods to make unit testing possible:
-
-        abstract void getLeaderClusterState(Client leaderClient,
-                                            BiConsumer<ClusterState,
-                                            Exception> handler);
+        /**
+         * Fetch the cluster state from the leader with the specified cluster alias
+         *
+         * @param leaderClusterAlias the cluster alias of the leader
+         * @param handler            the callback to invoke
+         */
+        abstract void getLeaderClusterState(String leaderClusterAlias, BiConsumer<ClusterState, Exception> handler);
 
-        abstract void createAndFollow(FollowIndexAction.Request followRequest,
-                                      Runnable successHandler,
-                                      Consumer<Exception> failureHandler);
+        abstract void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer<Exception> failureHandler);
 
-        abstract void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
-                                               Consumer<Exception> handler);
+        abstract void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler);
 
     }
 }

+ 1 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java

@@ -255,7 +255,7 @@ public class CreateAndFollowIndexAction extends Action<CreateAndFollowIndexActio
                     client,
                     clusterAlias,
                     leaderIndex,
-                    listener,
+                    listener::onFailure,
                     leaderIndexMetaData -> createFollowerIndex(leaderIndexMetaData, request, listener));
         }
 

+ 1 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java

@@ -370,7 +370,7 @@ public class FollowIndexAction extends Action<AcknowledgedResponse> {
                     client,
                     clusterAlias,
                     leaderIndex,
-                    listener,
+                    listener::onFailure,
                     leaderIndexMetadata -> {
                         try {
                             start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, listener);

+ 38 - 19
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java

@@ -21,8 +21,10 @@ import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
 import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
 import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
 
@@ -30,20 +32,29 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 public class TransportPutAutoFollowPatternAction extends
     TransportMasterNodeAction<PutAutoFollowPatternAction.Request, AcknowledgedResponse> {
 
     private final Client client;
+    private final CcrLicenseChecker ccrLicenseChecker;
 
     @Inject
-    public TransportPutAutoFollowPatternAction(Settings settings, TransportService transportService, ClusterService clusterService,
-                                               ThreadPool threadPool, ActionFilters actionFilters, Client client,
-                                               IndexNameExpressionResolver indexNameExpressionResolver) {
+    public TransportPutAutoFollowPatternAction(
+            final Settings settings,
+            final TransportService transportService,
+            final ClusterService clusterService,
+            final ThreadPool threadPool,
+            final ActionFilters actionFilters,
+            final Client client,
+            final IndexNameExpressionResolver indexNameExpressionResolver,
+            final CcrLicenseChecker ccrLicenseChecker) {
         super(settings, PutAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters,
             indexNameExpressionResolver, PutAutoFollowPatternAction.Request::new);
         this.client = client;
+        this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
     }
 
     @Override
@@ -60,6 +71,10 @@ public class TransportPutAutoFollowPatternAction extends
     protected void masterOperation(PutAutoFollowPatternAction.Request request,
                                    ClusterState state,
                                    ActionListener<AcknowledgedResponse> listener) throws Exception {
+        if (ccrLicenseChecker.isCcrAllowed() == false) {
+            listener.onFailure(LicenseUtils.newComplianceException("ccr"));
+            return;
+        }
         final Client leaderClient;
         if (request.getLeaderClusterAlias().equals("_local_")) {
             leaderClient = client;
@@ -71,22 +86,26 @@ public class TransportPutAutoFollowPatternAction extends
         clusterStateRequest.clear();
         clusterStateRequest.metaData(true);
 
-        leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
-            final ClusterState leaderClusterState = clusterStateResponse.getState();
-            clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderClusterAlias(),
-                new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
-
-                    @Override
-                    protected AcknowledgedResponse newResponse(boolean acknowledged) {
-                        return new AcknowledgedResponse(acknowledged);
-                    }
-
-                    @Override
-                    public ClusterState execute(ClusterState currentState) throws Exception {
-                        return innerPut(request, currentState, leaderClusterState);
-                    }
-                });
-        }, listener::onFailure));
+        leaderClient.admin().cluster().state(
+                clusterStateRequest,
+                ActionListener.wrap(
+                        clusterStateResponse -> {
+                            final ClusterState leaderClusterState = clusterStateResponse.getState();
+                            clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderClusterAlias(),
+                                    new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
+
+                                        @Override
+                                        protected AcknowledgedResponse newResponse(boolean acknowledged) {
+                                            return new AcknowledgedResponse(acknowledged);
+                                        }
+
+                                        @Override
+                                        public ClusterState execute(ClusterState currentState) throws Exception {
+                                            return innerPut(request, currentState, leaderClusterState);
+                                        }
+                                    });
+                        },
+                        listener::onFailure));
     }
 
     static ClusterState innerPut(PutAutoFollowPatternAction.Request request,

+ 60 - 8
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java

@@ -6,15 +6,22 @@
 
 package org.elasticsearch.xpack.ccr;
 
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESSingleNodeTestCase;
+import org.elasticsearch.test.MockLogAppender;
+import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
 import org.elasticsearch.xpack.ccr.action.CcrStatsAction;
 import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction;
 import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
+import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction;
 import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
 
 import java.util.Collection;
@@ -28,10 +35,10 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
 
     @Override
     protected Collection<Class<? extends Plugin>> getPlugins() {
-        return Collections.singletonList(IncompatibleLicenseLocalStateCcr.class);
+        return Collections.singletonList(NonCompliantLicenseLocalStateCcr.class);
     }
 
-    public void testThatFollowingIndexIsUnavailableWithIncompatibleLicense() throws InterruptedException {
+    public void testThatFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException {
         final FollowIndexAction.Request followRequest = getFollowRequest();
         final CountDownLatch latch = new CountDownLatch(1);
         client().execute(
@@ -45,14 +52,14 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
 
                     @Override
                     public void onFailure(final Exception e) {
-                        assertIncompatibleLicense(e);
+                        assertNonCompliantLicense(e);
                         latch.countDown();
                     }
                 });
         latch.await();
     }
 
-    public void testThatCreateAndFollowingIndexIsUnavailableWithIncompatibleLicense() throws InterruptedException {
+    public void testThatCreateAndFollowingIndexIsUnavailableWithNonCompliantLicense() throws InterruptedException {
         final FollowIndexAction.Request followRequest = getFollowRequest();
         final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
         final CountDownLatch latch = new CountDownLatch(1);
@@ -67,14 +74,14 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
 
                     @Override
                     public void onFailure(final Exception e) {
-                        assertIncompatibleLicense(e);
+                        assertNonCompliantLicense(e);
                         latch.countDown();
                     }
                 });
         latch.await();
     }
 
-    public void testThatCcrStatsAreUnavailableWithIncompatibleLicense() throws InterruptedException {
+    public void testThatCcrStatsAreUnavailableWithNonCompliantLicense() throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(1);
         client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.TasksRequest(), new ActionListener<CcrStatsAction.TasksResponse>() {
             @Override
@@ -84,7 +91,7 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
 
             @Override
             public void onFailure(final Exception e) {
-                assertIncompatibleLicense(e);
+                assertNonCompliantLicense(e);
                 latch.countDown();
             }
         });
@@ -92,7 +99,52 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
         latch.await();
     }
 
-    private void assertIncompatibleLicense(final Exception e) {
+    public void testThatPutAutoFollowPatternsIsUnavailableWithNonCompliantLicense() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
+        request.setLeaderClusterAlias("leader");
+        request.setLeaderIndexPatterns(Collections.singletonList("*"));
+        client().execute(
+                PutAutoFollowPatternAction.INSTANCE,
+                request,
+                new ActionListener<AcknowledgedResponse>() {
+                    @Override
+                    public void onResponse(final AcknowledgedResponse response) {
+                        latch.countDown();
+                        fail();
+                    }
+
+                    @Override
+                    public void onFailure(final Exception e) {
+                        assertNonCompliantLicense(e);
+                        latch.countDown();
+                    }
+                });
+        latch.await();
+    }
+
+    public void testAutoFollowCoordinatorLogsSkippingAutoFollowCoordinationWithNonCompliantLicense() throws Exception {
+        final Logger logger = LogManager.getLogger(AutoFollowCoordinator.class);
+        final MockLogAppender appender = new MockLogAppender();
+        appender.start();
+        appender.addExpectation(
+                new MockLogAppender.ExceptionSeenEventExpectation(
+                        getTestName(),
+                        logger.getName(),
+                        Level.WARN,
+                        "skipping auto-follower coordination",
+                        ElasticsearchSecurityException.class,
+                        "current license is non-compliant for [ccr]"));
+        Loggers.addAppender(logger, appender);
+        try {
+            assertBusy(appender::assertAllExpectationsMatched);
+        } finally {
+            Loggers.removeAppender(logger, appender);
+            appender.stop();
+        }
+    }
+
+    private void assertNonCompliantLicense(final Exception e) {
         assertThat(e, instanceOf(ElasticsearchSecurityException.class));
         assertThat(e.getMessage(), equalTo("current license is non-compliant for [ccr]"));
     }

+ 3 - 3
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IncompatibleLicenseLocalStateCcr.java → x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/NonCompliantLicenseLocalStateCcr.java

@@ -12,16 +12,16 @@ import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
 
 import java.nio.file.Path;
 
-public class IncompatibleLicenseLocalStateCcr extends LocalStateCompositeXPackPlugin {
+public class NonCompliantLicenseLocalStateCcr extends LocalStateCompositeXPackPlugin {
 
-    public IncompatibleLicenseLocalStateCcr(final Settings settings, final Path configPath) throws Exception {
+    public NonCompliantLicenseLocalStateCcr(final Settings settings, final Path configPath) throws Exception {
         super(settings, configPath);
 
         plugins.add(new Ccr(settings, new CcrLicenseChecker(() -> false)) {
 
             @Override
             protected XPackLicenseState getLicenseState() {
-                return IncompatibleLicenseLocalStateCcr.this.getLicenseState();
+                return NonCompliantLicenseLocalStateCcr.this.getLicenseState();
             }
 
         });

+ 8 - 8
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

@@ -66,9 +66,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             invoked[0] = true;
             assertThat(e, nullValue());
         };
-        AutoFollower autoFollower = new AutoFollower(client, handler, currentState) {
+        AutoFollower autoFollower = new AutoFollower(handler, currentState) {
             @Override
-            void getLeaderClusterState(Client leaderClient, BiConsumer<ClusterState, Exception> handler) {
+            void getLeaderClusterState(String leaderClusterAlias, BiConsumer<ClusterState, Exception> handler) {
                 handler.accept(leaderState, null);
             }
 
@@ -113,9 +113,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             invoked[0] = true;
             assertThat(e, sameInstance(failure));
         };
-        AutoFollower autoFollower = new AutoFollower(client, handler, followerState) {
+        AutoFollower autoFollower = new AutoFollower(handler, followerState) {
             @Override
-            void getLeaderClusterState(Client leaderClient, BiConsumer<ClusterState, Exception> handler) {
+            void getLeaderClusterState(String leaderClusterAlias, BiConsumer<ClusterState, Exception> handler) {
                 handler.accept(null, failure);
             }
 
@@ -161,9 +161,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             invoked[0] = true;
             assertThat(e, sameInstance(failure));
         };
-        AutoFollower autoFollower = new AutoFollower(client, handler, followerState) {
+        AutoFollower autoFollower = new AutoFollower(handler, followerState) {
             @Override
-            void getLeaderClusterState(Client leaderClient, BiConsumer<ClusterState, Exception> handler) {
+            void getLeaderClusterState(String leaderClusterAlias, BiConsumer<ClusterState, Exception> handler) {
                 handler.accept(leaderState, null);
             }
 
@@ -211,9 +211,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             invoked[0] = true;
             assertThat(e, sameInstance(failure));
         };
-        AutoFollower autoFollower = new AutoFollower(client, handler, followerState) {
+        AutoFollower autoFollower = new AutoFollower(handler, followerState) {
             @Override
-            void getLeaderClusterState(Client leaderClient, BiConsumer<ClusterState, Exception> handler) {
+            void getLeaderClusterState(String leaderClusterAlias, BiConsumer<ClusterState, Exception> handler) {
                 handler.accept(leaderState, null);
             }