Pārlūkot izejas kodu

[CCR] Move headers from auto follow pattern to auto follow metadata (#33846)

This ensures that we will not serialize the headers as part of the
auto follow pattern in the to be added get auto follow api.
Martijn van Groningen 7 gadi atpakaļ
vecāks
revīzija
e1e5f40727

+ 19 - 11
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

@@ -254,7 +254,8 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
                 final String clusterAlias = entry.getKey();
                 final AutoFollowPattern autoFollowPattern = entry.getValue();
 
-                getLeaderClusterState(autoFollowPattern.getHeaders(), clusterAlias, (leaderClusterState, e) -> {
+                Map<String, String> headers = autoFollowMetadata.getHeaders().get(clusterAlias);
+                getLeaderClusterState(headers, clusterAlias, (leaderClusterState, e) -> {
                     if (leaderClusterState != null) {
                         assert e == null;
                         final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
@@ -264,7 +265,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
                             finalise(slot, new AutoFollowResult(clusterAlias));
                         } else {
                             Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
-                            checkAutoFollowPattern(clusterAlias, autoFollowPattern, leaderIndicesToFollow, resultHandler);
+                            checkAutoFollowPattern(clusterAlias, autoFollowPattern, leaderIndicesToFollow, headers, resultHandler);
                         }
                     } else {
                         finalise(slot, new AutoFollowResult(clusterAlias, e));
@@ -274,15 +275,18 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
             }
         }
 
-        private void checkAutoFollowPattern(String clusterAlias, AutoFollowPattern autoFollowPattern,
-                                            List<Index> leaderIndicesToFollow, Consumer<AutoFollowResult> resultHandler) {
+        private void checkAutoFollowPattern(String clusterAlias,
+                                            AutoFollowPattern autoFollowPattern,
+                                            List<Index> leaderIndicesToFollow,
+                                            Map<String, String> headers,
+                                            Consumer<AutoFollowResult> resultHandler) {
 
             final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
             final AtomicArray<Tuple<Index, Exception>> results = new AtomicArray<>(leaderIndicesToFollow.size());
             for (int i = 0; i < leaderIndicesToFollow.size(); i++) {
                 final Index indexToFollow = leaderIndicesToFollow.get(i);
                 final int slot = i;
-                followLeaderIndex(clusterAlias, indexToFollow, autoFollowPattern, error -> {
+                followLeaderIndex(clusterAlias, indexToFollow, autoFollowPattern, headers, error -> {
                     results.set(slot, new Tuple<>(indexToFollow, error));
                     if (leaderIndicesCountDown.countDown()) {
                         resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList()));
@@ -291,8 +295,11 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
             }
         }
 
-        private void followLeaderIndex(String clusterAlias, Index indexToFollow,
-                                       AutoFollowPattern pattern, Consumer<Exception> onResult) {
+        private void followLeaderIndex(String clusterAlias,
+                                       Index indexToFollow,
+                                       AutoFollowPattern pattern,
+                                       Map<String,String> headers,
+                                       Consumer<Exception> onResult) {
             final String leaderIndexName = indexToFollow.getName();
             final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);
 
@@ -319,7 +326,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
                 // The coordinator always runs on the elected master node, so we can update cluster state here:
                 updateAutoFollowMetadata(function, onResult);
             };
-            createAndFollow(pattern.getHeaders(), request, successHandler, onResult);
+            createAndFollow(headers, request, successHandler, onResult);
         }
 
         private void finalise(int slot, AutoFollowResult result) {
@@ -357,7 +364,8 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
             }
         }
 
-        static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(String clusterAlias, Index indexToFollow) {
+        static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(String clusterAlias,
+                                                                                      Index indexToFollow) {
             return currentState -> {
                 AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
 
@@ -366,8 +374,8 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
                 newFollowedIndexUUIDS.get(clusterAlias).add(indexToFollow.getUUID());
 
                 ClusterState.Builder newState = ClusterState.builder(currentState);
-                AutoFollowMetadata newAutoFollowMetadata =
-                    new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(), newFollowedIndexUUIDS);
+                AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(),
+                    newFollowedIndexUUIDS, currentAutoFollowMetadata.getHeaders());
                 newState.metaData(MetaData.builder(currentState.getMetaData())
                     .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata)
                     .build());

+ 3 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java

@@ -85,10 +85,12 @@ public class TransportDeleteAutoFollowPatternAction extends
         final Map<String, AutoFollowPattern> patternsCopy = new HashMap<>(patterns);
         final Map<String, List<String>> followedLeaderIndexUUIDSCopy =
             new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
+        final Map<String, Map<String, String>> headers = new HashMap<>(currentAutoFollowMetadata.getHeaders());
         patternsCopy.remove(request.getLeaderClusterAlias());
         followedLeaderIndexUUIDSCopy.remove(request.getLeaderClusterAlias());
+        headers.remove(request.getLeaderClusterAlias());
 
-        AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(patternsCopy, followedLeaderIndexUUIDSCopy);
+        AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(patternsCopy, followedLeaderIndexUUIDSCopy, headers);
         ClusterState.Builder newState = ClusterState.builder(currentState);
         newState.metaData(MetaData.builder(currentState.getMetaData())
             .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata)

+ 9 - 3
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java

@@ -123,12 +123,15 @@ public class TransportPutAutoFollowPatternAction extends
         AutoFollowMetadata currentAutoFollowMetadata = localState.metaData().custom(AutoFollowMetadata.TYPE);
         Map<String, List<String>> followedLeaderIndices;
         Map<String, AutoFollowPattern> patterns;
+        Map<String, Map<String, String>> headers;
         if (currentAutoFollowMetadata != null) {
             patterns = new HashMap<>(currentAutoFollowMetadata.getPatterns());
             followedLeaderIndices = new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
+            headers = new HashMap<>(currentAutoFollowMetadata.getHeaders());
         } else {
             patterns = new HashMap<>();
             followedLeaderIndices = new HashMap<>();
+            headers = new HashMap<>();
         }
 
         AutoFollowPattern previousPattern = patterns.get(request.getLeaderClusterAlias());
@@ -147,6 +150,10 @@ public class TransportPutAutoFollowPatternAction extends
                 followedIndexUUIDs);
         }
 
+        if (filteredHeaders != null) {
+            headers.put(request.getLeaderClusterAlias(), filteredHeaders);
+        }
+
         AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
             request.getLeaderIndexPatterns(),
             request.getFollowIndexNamePattern(),
@@ -156,12 +163,11 @@ public class TransportPutAutoFollowPatternAction extends
             request.getMaxConcurrentWriteBatches(),
             request.getMaxWriteBufferSize(),
             request.getMaxRetryDelay(),
-            request.getPollTimeout(),
-            filteredHeaders);
+            request.getPollTimeout());
         patterns.put(request.getLeaderClusterAlias(), autoFollowPattern);
         ClusterState.Builder newState = ClusterState.builder(localState);
         newState.metaData(MetaData.builder(localState.getMetaData())
-            .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, followedLeaderIndices))
+            .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, followedLeaderIndices, headers))
             .build());
         return newState.build();
     }

+ 11 - 4
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java

@@ -13,7 +13,6 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,6 +35,7 @@ public class AutoFollowMetadataTests extends AbstractSerializingTestCase<AutoFol
         int numEntries = randomIntBetween(0, 32);
         Map<String, AutoFollowMetadata.AutoFollowPattern> configs = new HashMap<>(numEntries);
         Map<String, List<String>> followedLeaderIndices = new HashMap<>(numEntries);
+        Map<String, Map<String, String>> headers = new HashMap<>(numEntries);
         for (int i = 0; i < numEntries; i++) {
             List<String> leaderPatterns = Arrays.asList(generateRandomStringArray(4, 4, false));
             AutoFollowMetadata.AutoFollowPattern autoFollowPattern = new AutoFollowMetadata.AutoFollowPattern(
@@ -47,12 +47,19 @@ public class AutoFollowMetadataTests extends AbstractSerializingTestCase<AutoFol
                 randomIntBetween(0, Integer.MAX_VALUE),
                 randomIntBetween(0, Integer.MAX_VALUE),
                 TimeValue.timeValueMillis(500),
-                TimeValue.timeValueMillis(500),
-                randomBoolean() ? null : Collections.singletonMap("key", "value"));
+                TimeValue.timeValueMillis(500));
             configs.put(Integer.toString(i), autoFollowPattern);
             followedLeaderIndices.put(Integer.toString(i), Arrays.asList(generateRandomStringArray(4, 4, false)));
+            if (randomBoolean()) {
+                int numHeaderEntries = randomIntBetween(1, 16);
+                Map<String, String> header = new HashMap<>();
+                for (int j = 0; j < numHeaderEntries; j++) {
+                    header.put(randomAlphaOfLength(5), randomAlphaOfLength(5));
+                }
+                headers.put(Integer.toString(i), header);
+            }
         }
-        return new AutoFollowMetadata(configs, followedLeaderIndices);
+        return new AutoFollowMetadata(configs, followedLeaderIndices, headers);
     }
 
     @Override

+ 3 - 3
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java

@@ -140,11 +140,11 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
             @Override
             public ClusterState execute(ClusterState currentState) throws Exception {
                 AutoFollowPattern autoFollowPattern =
-                    new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
+                    new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
                 AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(
                     Collections.singletonMap("test_alias", autoFollowPattern),
-                    Collections.emptyMap()
-                );
+                    Collections.emptyMap(),
+                    Collections.emptyMap());
 
                 ClusterState.Builder newState = ClusterState.builder(currentState);
                 newState.metaData(MetaData.builder(currentState.getMetaData())

+ 21 - 13
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

@@ -56,12 +56,14 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             .build();
 
         AutoFollowPattern autoFollowPattern =
-            new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
+            new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
         Map<String, AutoFollowPattern> patterns = new HashMap<>();
         patterns.put("remote", autoFollowPattern);
         Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
         followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
-        AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS);
+        Map<String, Map<String, String>> autoFollowHeaders = new HashMap<>();
+        autoFollowHeaders.put("remote", Collections.singletonMap("key", "val"));
+        AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders);
 
         ClusterState currentState = ClusterState.builder(new ClusterName("name"))
             .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
@@ -83,6 +85,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             void getLeaderClusterState(Map<String, String> headers,
                                        String leaderClusterAlias,
                                        BiConsumer<ClusterState, Exception> handler) {
+                assertThat(headers, sameInstance(autoFollowHeaders.get("remote")));
                 handler.accept(leaderState, null);
             }
 
@@ -91,6 +94,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
                                  FollowIndexAction.Request followRequest,
                                  Runnable successHandler,
                                  Consumer<Exception> failureHandler) {
+                assertThat(headers, sameInstance(autoFollowHeaders.get("remote")));
                 assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101"));
                 assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
                 successHandler.run();
@@ -115,12 +119,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
         when(client.getRemoteClusterClient(anyString())).thenReturn(client);
 
         AutoFollowPattern autoFollowPattern =
-            new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
+            new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
         Map<String, AutoFollowPattern> patterns = new HashMap<>();
         patterns.put("remote", autoFollowPattern);
         Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
         followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
-        AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS);
+        Map<String, Map<String, String>> headers = new HashMap<>();
+        AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers);
         ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
             .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
             .build();
@@ -172,12 +177,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             .build();
 
         AutoFollowPattern autoFollowPattern =
-            new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
+            new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
         Map<String, AutoFollowPattern> patterns = new HashMap<>();
         patterns.put("remote", autoFollowPattern);
         Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
         followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
-        AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS);
+        Map<String, Map<String, String>> headers = new HashMap<>();
+        AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers);
         ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
             .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
             .build();
@@ -233,12 +239,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             .build();
 
         AutoFollowPattern autoFollowPattern =
-            new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
+            new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
         Map<String, AutoFollowPattern> patterns = new HashMap<>();
         patterns.put("remote", autoFollowPattern);
         Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
         followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
-        AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS);
+        Map<String, Map<String, String>> headers = new HashMap<>();
+        AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers);
         ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
             .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
             .build();
@@ -285,10 +292,11 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
 
     public void testGetLeaderIndicesToFollow() {
         AutoFollowPattern autoFollowPattern =
-            new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null, null);
+            new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null);
+        Map<String, Map<String, String>> headers = new HashMap<>();
         ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
             .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
-                new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap())))
+                new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers)))
             .build();
 
         MetaData.Builder imdBuilder = MetaData.builder();
@@ -331,15 +339,15 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
 
     public void testGetFollowerIndexName() {
         AutoFollowPattern autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null,
-            null, null, null, null, null, null, null);
+            null, null, null, null, null, null);
         assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0"));
 
         autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-metrics-0", null, null,
-            null, null, null, null, null, null);
+            null, null, null, null, null);
         assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
 
         autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null,
-            null, null, null, null, null, null, null);
+            null, null, null, null, null, null);
         assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
     }
 

+ 13 - 5
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction.Req
 import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -25,30 +26,33 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
 
     public void testInnerDelete() {
         Map<String, List<String>> existingAlreadyFollowedIndexUUIDS = new HashMap<>();
+        Map<String, Map<String, String>> existingHeaders = new HashMap<>();
         Map<String, AutoFollowMetadata.AutoFollowPattern> existingAutoFollowPatterns = new HashMap<>();
         {
             List<String> existingPatterns = new ArrayList<>();
             existingPatterns.add("transactions-*");
             existingAutoFollowPatterns.put("eu_cluster",
-                new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null));
+                new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null));
 
             List<String> existingUUIDS = new ArrayList<>();
             existingUUIDS.add("_val");
             existingAlreadyFollowedIndexUUIDS.put("eu_cluster", existingUUIDS);
+            existingHeaders.put("eu_cluster", Collections.singletonMap("key", "val"));
         }
         {
             List<String> existingPatterns = new ArrayList<>();
             existingPatterns.add("logs-*");
             existingAutoFollowPatterns.put("asia_cluster",
-                new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null));
+                new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null));
 
             List<String> existingUUIDS = new ArrayList<>();
             existingUUIDS.add("_val");
             existingAlreadyFollowedIndexUUIDS.put("asia_cluster", existingUUIDS);
+            existingHeaders.put("asia_cluster", Collections.singletonMap("key", "val"));
         }
         ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster"))
             .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
-                new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS)))
+                new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS, existingHeaders)))
             .build();
 
         Request request = new Request();
@@ -60,20 +64,24 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
         assertThat(result.getPatterns().get("asia_cluster"), notNullValue());
         assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
         assertThat(result.getFollowedLeaderIndexUUIDs().get("asia_cluster"), notNullValue());
+        assertThat(result.getHeaders().size(), equalTo(1));
+        assertThat(result.getHeaders().get("asia_cluster"), notNullValue());
     }
 
     public void testInnerDeleteDoesNotExist() {
         Map<String, List<String>> existingAlreadyFollowedIndexUUIDS = new HashMap<>();
         Map<String, AutoFollowMetadata.AutoFollowPattern> existingAutoFollowPatterns = new HashMap<>();
+        Map<String, Map<String, String>> existingHeaders = new HashMap<>();
         {
             List<String> existingPatterns = new ArrayList<>();
             existingPatterns.add("transactions-*");
             existingAutoFollowPatterns.put("eu_cluster",
-                new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null));
+                new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null));
+            existingHeaders.put("key", Collections.singletonMap("key", "val"));
         }
         ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster"))
             .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
-                new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS)))
+                new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS, existingHeaders)))
             .build();
 
         Request request = new Request();

+ 7 - 2
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java

@@ -97,14 +97,17 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
         List<String> existingPatterns = new ArrayList<>();
         existingPatterns.add("transactions-*");
         existingAutoFollowPatterns.put("eu_cluster",
-            new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null));
+            new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null));
         Map<String, List<String>> existingAlreadyFollowedIndexUUIDS = new HashMap<>();
         List<String> existingUUIDS = new ArrayList<>();
         existingUUIDS.add("_val");
         existingAlreadyFollowedIndexUUIDS.put("eu_cluster", existingUUIDS);
+        Map<String, Map<String, String>> existingHeaders = new HashMap<>();
+        existingHeaders.put("eu_cluster", Collections.singletonMap("key", "val"));
+
         ClusterState localState = ClusterState.builder(new ClusterName("us_cluster"))
             .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
-                new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS)))
+                new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS, existingHeaders)))
             .build();
 
         int numLeaderIndices = randomIntBetween(1, 8);
@@ -129,6 +132,8 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
         assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(1), equalTo("transactions-*"));
         assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
         assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("eu_cluster").size(), equalTo(numLeaderIndices + 1));
+        assertThat(autoFollowMetadata.getHeaders().size(), equalTo(1));
+        assertThat(autoFollowMetadata.getHeaders().get("eu_cluster"), notNullValue());
     }
 
 }

+ 33 - 41
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java

@@ -21,10 +21,8 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.xpack.core.XPackPlugin;
-import org.elasticsearch.xpack.core.security.xcontent.XContentUtils;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -41,10 +39,15 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
 
     private static final ParseField PATTERNS_FIELD = new ParseField("patterns");
     private static final ParseField FOLLOWED_LEADER_INDICES_FIELD = new ParseField("followed_leader_indices");
+    private static final ParseField HEADERS = new ParseField("headers");
 
     @SuppressWarnings("unchecked")
     private static final ConstructingObjectParser<AutoFollowMetadata, Void> PARSER = new ConstructingObjectParser<>("auto_follow",
-        args -> new AutoFollowMetadata((Map<String, AutoFollowPattern>) args[0], (Map<String, List<String>>) args[1]));
+        args -> new AutoFollowMetadata(
+            (Map<String, AutoFollowPattern>) args[0],
+            (Map<String, List<String>>) args[1],
+            (Map<String, Map<String, String>>) args[2]
+        ));
 
     static {
         PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> {
@@ -61,20 +64,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
             }
             return patterns;
         }, PATTERNS_FIELD);
-        PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> {
-            Map<String, List<String>> alreadyFollowedIndexUUIDS = new HashMap<>();
-            String fieldName = null;
-            for (XContentParser.Token token = p.nextToken(); token != XContentParser.Token.END_OBJECT; token = p.nextToken()) {
-                if (token == XContentParser.Token.FIELD_NAME) {
-                    fieldName = p.currentName();
-                } else if (token == XContentParser.Token.START_ARRAY) {
-                    alreadyFollowedIndexUUIDS.put(fieldName, Arrays.asList(XContentUtils.readStringArray(p, false)));
-                } else {
-                    throw new ElasticsearchParseException("unexpected token [" + token + "]");
-                }
-            }
-            return alreadyFollowedIndexUUIDS;
-        }, FOLLOWED_LEADER_INDICES_FIELD);
+        PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.map(), FOLLOWED_LEADER_INDICES_FIELD);
+        PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.map(), HEADERS);
     }
 
     public static AutoFollowMetadata fromXContent(XContentParser parser) throws IOException {
@@ -83,15 +74,21 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
 
     private final Map<String, AutoFollowPattern> patterns;
     private final Map<String, List<String>> followedLeaderIndexUUIDs;
+    private final Map<String, Map<String, String>> headers;
 
-    public AutoFollowMetadata(Map<String, AutoFollowPattern> patterns, Map<String, List<String>> followedLeaderIndexUUIDs) {
+    public AutoFollowMetadata(Map<String, AutoFollowPattern> patterns,
+                              Map<String, List<String>> followedLeaderIndexUUIDs,
+                              Map<String, Map<String, String>> headers) {
         this.patterns = patterns;
         this.followedLeaderIndexUUIDs = followedLeaderIndexUUIDs;
+        this.headers = Collections.unmodifiableMap(headers);
     }
 
     public AutoFollowMetadata(StreamInput in) throws IOException {
         patterns = in.readMap(StreamInput::readString, AutoFollowPattern::new);
         followedLeaderIndexUUIDs = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
+        headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString,
+            valIn -> Collections.unmodifiableMap(valIn.readMap(StreamInput::readString, StreamInput::readString))));
     }
 
     public Map<String, AutoFollowPattern> getPatterns() {
@@ -102,11 +99,14 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
         return followedLeaderIndexUUIDs;
     }
 
+    public Map<String, Map<String, String>> getHeaders() {
+        return headers;
+    }
+
     @Override
     public EnumSet<MetaData.XContentContext> context() {
-        // TODO: When a snapshot is restored do we want to restore this?
-        // (Otherwise we would start following indices automatically immediately)
-        return MetaData.ALL_CONTEXTS;
+        // No XContentContext.API, because the headers should not be serialized as part of clusters state api
+        return EnumSet.of(MetaData.XContentContext.SNAPSHOT, MetaData.XContentContext.GATEWAY);
     }
 
     @Override
@@ -123,6 +123,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
     public void writeTo(StreamOutput out) throws IOException {
         out.writeMap(patterns, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
         out.writeMapOfLists(followedLeaderIndexUUIDs, StreamOutput::writeString, StreamOutput::writeString);
+        out.writeMap(headers, StreamOutput::writeString,
+            (valOut, header) -> valOut.writeMap(header, StreamOutput::writeString, StreamOutput::writeString));
     }
 
     @Override
@@ -140,6 +142,11 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
             builder.field(entry.getKey(), entry.getValue());
         }
         builder.endObject();
+        builder.startObject(HEADERS.getPreferredName());
+        for (Map.Entry<String, Map<String, String>> entry : headers.entrySet()) {
+            builder.field(entry.getKey(), entry.getValue());
+        }
+        builder.endObject();
         return builder;
     }
 
@@ -172,14 +179,12 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
         public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
         public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
         public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
-        private static final ParseField HEADERS = new ParseField("headers");
 
         @SuppressWarnings("unchecked")
         private static final ConstructingObjectParser<AutoFollowPattern, Void> PARSER =
             new ConstructingObjectParser<>("auto_follow_pattern",
                 args -> new AutoFollowPattern((List<String>) args[0], (String) args[1], (Integer) args[2], (Integer) args[3],
-                    (Long) args[4], (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8],
-                    (Map<String, String>) args[9]));
+                    (Long) args[4], (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8]));
 
         static {
             PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD);
@@ -195,7 +200,6 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
             PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
                 (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()),
                 POLL_TIMEOUT, ObjectParser.ValueType.STRING);
-            PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS);
         }
 
         private final List<String> leaderIndexPatterns;
@@ -207,7 +211,6 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
         private final Integer maxWriteBufferSize;
         private final TimeValue maxRetryDelay;
         private final TimeValue pollTimeout;
-        private final Map<String, String> headers;
 
         public AutoFollowPattern(List<String> leaderIndexPatterns,
                                  String followIndexPattern,
@@ -217,8 +220,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
                                  Integer maxConcurrentWriteBatches,
                                  Integer maxWriteBufferSize,
                                  TimeValue maxRetryDelay,
-                                 TimeValue pollTimeout,
-                                 Map<String, String> headers) {
+                                 TimeValue pollTimeout) {
             this.leaderIndexPatterns = leaderIndexPatterns;
             this.followIndexPattern = followIndexPattern;
             this.maxBatchOperationCount = maxBatchOperationCount;
@@ -228,10 +230,9 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
             this.maxWriteBufferSize = maxWriteBufferSize;
             this.maxRetryDelay = maxRetryDelay;
             this.pollTimeout = pollTimeout;
-            this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
         }
 
-        AutoFollowPattern(StreamInput in) throws IOException {
+        public AutoFollowPattern(StreamInput in) throws IOException {
             leaderIndexPatterns = in.readList(StreamInput::readString);
             followIndexPattern = in.readOptionalString();
             maxBatchOperationCount = in.readOptionalVInt();
@@ -241,7 +242,6 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
             maxWriteBufferSize = in.readOptionalVInt();
             maxRetryDelay = in.readOptionalTimeValue();
             pollTimeout = in.readOptionalTimeValue();
-            this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
         }
 
         public boolean match(String indexName) {
@@ -288,10 +288,6 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
             return pollTimeout;
         }
 
-        public Map<String, String> getHeaders() {
-            return headers;
-        }
-
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeStringList(leaderIndexPatterns);
@@ -303,7 +299,6 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
             out.writeOptionalVInt(maxWriteBufferSize);
             out.writeOptionalTimeValue(maxRetryDelay);
             out.writeOptionalTimeValue(pollTimeout);
-            out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
         }
 
         @Override
@@ -333,7 +328,6 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
             if (pollTimeout != null) {
                 builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout);
             }
-            builder.field(HEADERS.getPreferredName(), headers);
             return builder;
         }
 
@@ -355,8 +349,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
                 Objects.equals(maxConcurrentWriteBatches, that.maxConcurrentWriteBatches) &&
                 Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) &&
                 Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
-                Objects.equals(pollTimeout, that.pollTimeout) &&
-                Objects.equals(headers, that.headers);
+                Objects.equals(pollTimeout, that.pollTimeout);
         }
 
         @Override
@@ -370,8 +363,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
                 maxConcurrentWriteBatches,
                 maxWriteBufferSize,
                 maxRetryDelay,
-                pollTimeout,
-                headers
+                pollTimeout
             );
         }
     }