Переглянути джерело

Fix renaming data streams with CCR replication (#88875)

This commit fixes the situation where a user wants to use CCR to replicate indices that are part of
a data stream while renaming the data stream. For example, assume a user has an auto-follow request
that looks like this:

```
PUT /_ccr/auto_follow/my-auto-follow-pattern
{
  "remote_cluster" : "other-cluster",
  "leader_index_patterns" : ["logs-*"],
  "follow_index_pattern" : "{{leader_index}}_copy"
}
```

And then the data stream `logs-mysql-error` was created, creating the backing index
`.ds-logs-mysql-error-2022-07-29-000001`.

Prior to this commit, replicating this data stream means that the backing index would be renamed to
`.ds-logs-mysql-error-2022-07-29-000001_copy` and the data stream would *not* be renamed. This
caused a check to trip in `TransportPutLifecycleAction` asserting that a backing index was not
renamed for a data stream during following.

After this commit, there are a couple of changes:

First, the data stream will also be renamed. This means that the `logs-mysql-error` becomes
`logs-mysql-error_copy` when created on the follower cluster. Because of the way that CCR works,
this means we need to support renaming a data stream for a regular "create follower" request, so a
new parameter has been added: `data_stream_name`. It works like this:

```
PUT /mynewindex/_ccr/follow
{
  "remote_cluster": "other-cluster",
  "leader_index": "myotherindex",
  "data_stream_name": "new_ds"
}
```

Second, the backing index for a data stream must be renamed in a way that does not break the parsing
of a data stream backing pattern, whereas previously the index
`.ds-logs-mysql-error-2022-07-29-000001` would be renamed to
`.ds-logs-mysql-error-2022-07-29-000001_copy` (an illegal name since it doesn't end with the
rollover digit), after this commit it will be renamed to
`.ds-logs-mysql-error_copy-2022-07-29-000001` to match the renamed data stream. This means that for
the given `follow_index_pattern` of `{{leader_index}}_copy` the index changes look like:

| Leader Cluster | Follower Cluster |
|--------------|-----------|
| `logs-mysql-error` (data stream) | `logs-mysql-error_copy` (data stream) |
| `.ds-logs-mysql-error-2022-07-29-000001`      | `.ds-logs-mysql-error_copy-2022-07-29-000001` |

Which internally means the auto-follow request turned into the create follower request of:

```
PUT /.ds-logs-mysql-error_copy-2022-07-29-000001/_ccr/follow
{
  "remote_cluster": "other-cluster",
  "leader_index": ".ds-logs-mysql-error-2022-07-29-000001",
  "data_stream_name": "logs-mysql-error_copy"
}
```

Relates to https://github.com/elastic/elasticsearch/pull/84940 (cherry-picked the commit for a test)
Relates to https://github.com/elastic/elasticsearch/pull/61993 (where data stream support was first introduced for CCR)
Resolves https://github.com/elastic/elasticsearch/issues/81751
Lee Hinman 3 роки тому
батько
коміт
3420be0ca5

+ 6 - 0
docs/changelog/88875.yaml

@@ -0,0 +1,6 @@
+pr: 88875
+summary: Fix renaming data streams with CCR replication
+area: "Data streams"
+type: bug
+issues:
+ - 81751

+ 8 - 5
docs/reference/ccr/apis/auto-follow/put-auto-follow-pattern.asciidoc

@@ -85,11 +85,14 @@ the new patterns.
   more `leader_index_patterns` and one or more `leader_index_exclusion_patterns` won't be followed.
 
 `follow_index_pattern`::
-  (Optional, string) The name of follower index. The template `{{leader_index}}`
-  can be used to derive the name of the follower index from the name of the
-  leader index. When following a data stream, use `{{leader_index}}`; {ccr-init}
-  does not support changes to the names of a follower data stream's backing
-  indices.
+  (Optional, string) The name of follower index. The template `{{leader_index}}` can be used to
+  derive the name of the follower index from the name of the leader index. When following a data
+  stream, the `follow_index_pattern` will be used for renaming not only the leader index, but also
+  the data stream containing the leader index. For example, a data stream called
+  `logs-mysql-default` with a backing index of `.ds-logs-mysql-default-2022-01-01-000001` and a
+  `follow_index_pattern` of `{{leader_index}}_copy` will replicate the data stream as
+  `logs-mysql-default_copy` and the backing index as
+  `.ds-logs-mysql-default_copy-2022-01-01-000001`.
 
 include::../follow-request-body.asciidoc[]
 

+ 20 - 0
docs/reference/ccr/apis/follow/put-follow.asciidoc

@@ -76,6 +76,26 @@ referenced leader index. When this API returns, the follower index exists, and
   (Required, string) The <<remote-clusters,remote cluster>> containing
   the leader index.
 
+[[ccr-put-follow-request-body-data_stream_name]]`data_stream_name`::
+  (Optional, string) If the leader index is part of a <<data-streams,data stream>>, the name to
+  which the local data stream for the followed index should be renamed. For example, A request like:
+
+[source,console]
+--------------------------------------------------
+PUT /.ds-logs-mysql-default_copy-2022-01-01-000001/_ccr/follow
+{
+  "remote_cluster" : "remote_cluster",
+  "leader_index" : ".ds-logs-mysql-default-2022-01-01-000001",
+  "data_stream_name": "logs-mysql-default_copy"
+}
+--------------------------------------------------
+// TEST[skip:no setup]
+
+Replicates the leader index `.ds-logs-mysql-default-2022-01-01-000001` into the follower index
+`.ds-logs-mysql-default_copy-2022-01-01-000001` and will do so using the data stream
+`logs-mysql-default_copy`, as opposed to the original leader data stream name of
+`logs-mysql-default`.
+
 include::../follow-request-body.asciidoc[]
 
 [[ccr-put-follow-examples]]

+ 122 - 7
x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java

@@ -236,7 +236,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
         int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
         try {
             // Create auto follow pattern
-            createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster");
+            createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster", null);
 
             // Create data stream and ensure that is is auto followed
             try (RestClient leaderClient = buildLeaderClient()) {
@@ -320,6 +320,121 @@ public class AutoFollowIT extends ESCCRRestTestCase {
         }
     }
 
+    public void testDataStreamsRenameFollowDataStream() throws Exception {
+        if ("follow".equals(targetCluster) == false) {
+            return;
+        }
+
+        final int numDocs = 64;
+        final String dataStreamName = "logs-mysql-error";
+        final String dataStreamNameFollower = "logs-mysql-error_copy";
+        final String autoFollowPatternName = getTestName().toLowerCase(Locale.ROOT);
+
+        int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
+        try {
+            // Create auto follow pattern
+            createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster", "{{leader_index}}_copy");
+
+            // Create data stream and ensure that is is auto followed
+            try (RestClient leaderClient = buildLeaderClient()) {
+                for (int i = 0; i < numDocs; i++) {
+                    Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
+                    indexRequest.addParameter("refresh", "true");
+                    indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
+                    assertOK(leaderClient.performRequest(indexRequest));
+                }
+                verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1));
+                verifyDocuments(leaderClient, dataStreamName, numDocs);
+            }
+            logger.info(
+                "--> checking {} with index {} has been auto followed to {} with backing index {}",
+                dataStreamName,
+                backingIndexName(dataStreamName, 1),
+                dataStreamNameFollower,
+                backingIndexName(dataStreamNameFollower, 1)
+            );
+            assertBusy(() -> {
+                assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
+                verifyDataStream(client(), dataStreamNameFollower, backingIndexName(dataStreamNameFollower, 1));
+                ensureYellow(dataStreamNameFollower);
+                verifyDocuments(client(), dataStreamNameFollower, numDocs);
+            });
+
+            // First rollover and ensure second backing index is replicated:
+            logger.info("--> rolling over");
+            try (RestClient leaderClient = buildLeaderClient()) {
+                Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
+                assertOK(leaderClient.performRequest(rolloverRequest));
+                verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
+
+                Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
+                indexRequest.addParameter("refresh", "true");
+                indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
+                assertOK(leaderClient.performRequest(indexRequest));
+                verifyDocuments(leaderClient, dataStreamName, numDocs + 1);
+            }
+            assertBusy(() -> {
+                assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2));
+                verifyDataStream(
+                    client(),
+                    dataStreamNameFollower,
+                    backingIndexName(dataStreamNameFollower, 1),
+                    backingIndexName(dataStreamNameFollower, 2)
+                );
+                ensureYellow(dataStreamNameFollower);
+                verifyDocuments(client(), dataStreamNameFollower, numDocs + 1);
+            });
+
+            // Second rollover and ensure third backing index is replicated:
+            logger.info("--> rolling over");
+            try (RestClient leaderClient = buildLeaderClient()) {
+                Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
+                assertOK(leaderClient.performRequest(rolloverRequest));
+                verifyDataStream(
+                    leaderClient,
+                    dataStreamName,
+                    backingIndexName(dataStreamName, 1),
+                    backingIndexName(dataStreamName, 2),
+                    backingIndexName(dataStreamName, 3)
+                );
+
+                Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
+                indexRequest.addParameter("refresh", "true");
+                indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
+                assertOK(leaderClient.performRequest(indexRequest));
+                verifyDocuments(leaderClient, dataStreamName, numDocs + 2);
+            }
+            assertBusy(() -> {
+                assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 3));
+                verifyDataStream(
+                    client(),
+                    dataStreamNameFollower,
+                    backingIndexName(dataStreamNameFollower, 1),
+                    backingIndexName(dataStreamNameFollower, 2),
+                    backingIndexName(dataStreamNameFollower, 3)
+                );
+                ensureYellow(dataStreamNameFollower);
+                verifyDocuments(client(), dataStreamNameFollower, numDocs + 2);
+            });
+
+        } finally {
+            cleanUpFollower(
+                List.of(
+                    backingIndexName(dataStreamNameFollower, 1),
+                    backingIndexName(dataStreamNameFollower, 2),
+                    backingIndexName(dataStreamNameFollower, 3)
+                ),
+                List.of(dataStreamNameFollower),
+                List.of(autoFollowPatternName)
+            );
+            cleanUpLeader(
+                List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), backingIndexName(dataStreamName, 3)),
+                List.of(dataStreamName),
+                List.of()
+            );
+        }
+    }
+
     public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception {
         if ("follow".equals(targetCluster) == false) {
             return;
@@ -353,7 +468,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
             }
 
             // Create auto follow pattern
-            createAutoFollowPattern(client(), autoFollowPatternName, dataStreamName + "*", "leader_cluster");
+            createAutoFollowPattern(client(), autoFollowPatternName, dataStreamName + "*", "leader_cluster", null);
 
             // Rollover and ensure only second backing index is replicated:
             try (RestClient leaderClient = buildLeaderClient()) {
@@ -410,7 +525,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
         List<String> backingIndexNames = null;
         try {
             // Create auto follow pattern
-            createAutoFollowPattern(client(), autoFollowPatternName, "logs-tomcat-*", "leader_cluster");
+            createAutoFollowPattern(client(), autoFollowPatternName, "logs-tomcat-*", "leader_cluster", null);
 
             // Create data stream and ensure that is is auto followed
             try (var leaderClient = buildLeaderClient()) {
@@ -531,7 +646,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
         int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
         try {
             // Create auto follow pattern
-            createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster");
+            createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster", null);
 
             // Create leader index and write alias:
             try (var leaderClient = buildLeaderClient()) {
@@ -618,7 +733,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 
         try {
             // Create auto follow pattern in follow cluster
-            createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster");
+            createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster", null);
 
             // Create auto follow pattern in leader cluster:
             try (var leaderClient = buildLeaderClient()) {
@@ -658,7 +773,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
                 }
                 assertOK(leaderClient.performRequest(request));
                 // Then create the actual auto follow pattern:
-                createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster");
+                createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster", null);
             }
 
             var numDocs = 128;
@@ -832,7 +947,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
         final String mountedIndex = testPrefix + "-mounted";
 
         try {
-            createAutoFollowPattern(client(), autoFollowPattern, testPrefix + "-*", "leader_cluster");
+            createAutoFollowPattern(client(), autoFollowPattern, testPrefix + "-*", "leader_cluster", null);
 
             // Create a regular index on leader
             try (var leaderClient = buildLeaderClient()) {

+ 0 - 20
x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java

@@ -180,26 +180,6 @@ public class FollowIndexIT extends ESCCRRestTestCase {
         assertThat(failure.getMessage(), containsString("cannot follow [logs-syslog-prod], because it is a DATA_STREAM"));
     }
 
-    public void testChangeBackingIndexNameFails() throws Exception {
-        if ("follow".equals(targetCluster) == false) {
-            return;
-        }
-
-        final String dataStreamName = "logs-foobar-prod";
-        try (RestClient leaderClient = buildLeaderClient()) {
-            Request request = new Request("PUT", "/_data_stream/" + dataStreamName);
-            assertOK(leaderClient.performRequest(request));
-            verifyDataStream(leaderClient, dataStreamName, DataStream.getDefaultBackingIndexName("logs-foobar-prod", 1));
-        }
-
-        ResponseException failure = expectThrows(
-            ResponseException.class,
-            () -> followIndex(DataStream.getDefaultBackingIndexName("logs-foobar-prod", 1), ".ds-logs-barbaz-prod-000001")
-        );
-        assertThat(failure.getResponse().getStatusLine().getStatusCode(), equalTo(400));
-        assertThat(failure.getMessage(), containsString("a backing index name in the local and remote cluster must remain the same"));
-    }
-
     public void testFollowSearchableSnapshotsFails() throws Exception {
         final String testPrefix = getTestName().toLowerCase(Locale.ROOT);
 

+ 1 - 1
x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java

@@ -281,7 +281,7 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
 
         // Setup
         {
-            createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster");
+            createAutoFollowPattern(adminClient(), "test_pattern", "logs-eu*", "leader_cluster", null);
         }
         // Create data stream and ensure that it is auto followed
         {

+ 10 - 1
x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java

@@ -335,7 +335,13 @@ public class ESCCRRestTestCase extends ESRestTestCase {
         return List.copyOf(actualBackingIndices);
     }
 
-    protected static void createAutoFollowPattern(RestClient client, String name, String pattern, String remoteCluster) throws IOException {
+    protected static void createAutoFollowPattern(
+        RestClient client,
+        String name,
+        String pattern,
+        String remoteCluster,
+        String followIndexPattern
+    ) throws IOException {
         Request request = new Request("PUT", "/_ccr/auto_follow/" + name);
         try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
             bodyBuilder.startObject();
@@ -345,6 +351,9 @@ public class ESCCRRestTestCase extends ESRestTestCase {
                     bodyBuilder.value(pattern);
                 }
                 bodyBuilder.endArray();
+                if (followIndexPattern != null) {
+                    bodyBuilder.field("follow_index_pattern", followIndexPattern);
+                }
                 bodyBuilder.field("remote_cluster", remoteCluster);
             }
             bodyBuilder.endObject();

+ 118 - 10
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

@@ -19,6 +19,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -61,6 +62,8 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.core.Strings.format;
@@ -72,9 +75,24 @@ import static org.elasticsearch.xpack.core.ccr.AutoFollowStats.AutoFollowedClust
  */
 public class AutoFollowCoordinator extends AbstractLifecycleComponent implements ClusterStateListener {
 
+    /**
+     * This is the string that will be replaced by the leader index name for a backing index or data
+     * stream. It allows auto-following to automatically rename an index or data stream when
+     * automatically followed. For example, using "{{leader_index}}_copy" for the follow pattern
+     * means that a data stream called "logs-foo-bar" would be renamed "logs-foo-bar_copy" when
+     * replicated, and a backing index called ".ds-logs-foo-bar-2022-02-02-000001" would be renamed
+     * to ".ds-logs-foo-bar_copy-2022-02-02-000001".
+     * See {@link AutoFollower#getFollowerIndexName} for the entire usage.
+     */
+    public static final String AUTO_FOLLOW_PATTERN_REPLACEMENT = "{{leader_index}}";
+
     private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class);
     private static final int MAX_AUTO_FOLLOW_ERRORS = 256;
 
+    private static final Pattern DS_BACKING_PATTERN = Pattern.compile(
+        "^(.*?" + DataStream.BACKING_INDEX_PREFIX + ")(.+)-(\\d{4}.\\d{2}.\\d{2})(-[\\d]+)?$"
+    );
+
     private final Client client;
     private final ClusterService clusterService;
     private final CcrLicenseChecker ccrLicenseChecker;
@@ -563,6 +581,12 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
             cleanFollowedRemoteIndices(remoteClusterState, patterns);
         }
 
+        /**
+         * Go through all the leader indices that need to be followed, ensuring that they are
+         * auto-followed by only a single pattern, have soft-deletes enabled, are not
+         * searchable snapshots, and are not already followed. If all of those conditions are met,
+         * then follow the indices.
+         */
         private void checkAutoFollowPattern(
             String autoFollowPattenName,
             String remoteClusterString,
@@ -582,8 +606,13 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
                 leaderIndicesToFollow.size()
             );
 
+            // Loop through all the as-of-yet-unfollowed indices from the leader
             for (final Index indexToFollow : leaderIndicesToFollow) {
+                // Look up the abstraction for the given index, e.g., an index ".ds-foo" could look
+                // up the Data Stream "foo"
                 IndexAbstraction indexAbstraction = remoteMetadata.getIndicesLookup().get(indexToFollow.getName());
+                // Ensure that the remote cluster doesn't have other patterns
+                // that would follow the index, there can be only one.
                 List<String> otherMatchingPatterns = patternsForTheSameRemoteCluster.stream()
                     .filter(otherPattern -> otherPattern.v2().match(indexAbstraction))
                     .map(Tuple::v1)
@@ -605,6 +634,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
                     );
                 } else {
                     final IndexMetadata leaderIndexMetadata = remoteMetadata.getIndexSafe(indexToFollow);
+                    // First ensure that the index on the leader that we want to follow has soft-deletes enabled
                     if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(leaderIndexMetadata.getSettings()) == false) {
                         String message = String.format(
                             Locale.ROOT,
@@ -639,10 +669,12 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
                             error -> groupedListener.onResponse(new Tuple<>(indexToFollow, error))
                         );
                     } else {
+                        // Finally, if there are no reasons why we cannot follow the leader index, perform the follow.
                         followLeaderIndex(
                             autoFollowPattenName,
                             remoteClusterString,
                             indexToFollow,
+                            indexAbstraction,
                             autoFollowPattern,
                             headers,
                             error -> groupedListener.onResponse(new Tuple<>(indexToFollow, error))
@@ -669,22 +701,32 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
             return false;
         }
 
-        private void followLeaderIndex(
-            String autoFollowPattenName,
-            String remoteClusterString,
+        /**
+         * Given a remote cluster, index that will be followed (and its abstraction), as well as an
+         * {@link AutoFollowPattern}, generate the internal follow request for following the index.
+         */
+        static PutFollowAction.Request generateRequest(
+            String remoteCluster,
             Index indexToFollow,
-            AutoFollowPattern pattern,
-            Map<String, String> headers,
-            Consumer<Exception> onResult
+            IndexAbstraction indexAbstraction,
+            AutoFollowPattern pattern
         ) {
             final String leaderIndexName = indexToFollow.getName();
             final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);
 
             PutFollowAction.Request request = new PutFollowAction.Request();
-            request.setRemoteCluster(remoteClusterString);
+            request.setRemoteCluster(remoteCluster);
             request.setLeaderIndex(indexToFollow.getName());
             request.setFollowerIndex(followIndexName);
             request.setSettings(pattern.getSettings());
+            // If there was a pattern specified for renaming the backing index, and this index is
+            // part of a data stream, then send the new data stream name as part of the request.
+            if (pattern.getFollowIndexPattern() != null && indexAbstraction.getParentDataStream() != null) {
+                String dataStreamName = indexAbstraction.getParentDataStream().getDataStream().getName();
+                // Send the follow index pattern as the data stream pattern, so that data streams can be
+                // renamed accordingly (not only the backing indices)
+                request.setDataStreamName(pattern.getFollowIndexPattern().replace(AUTO_FOLLOW_PATTERN_REPLACEMENT, dataStreamName));
+            }
             request.getParameters().setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
             request.getParameters().setMaxReadRequestSize(pattern.getMaxReadRequestSize());
             request.getParameters().setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());
@@ -697,9 +739,23 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
             request.getParameters().setReadPollTimeout(pattern.getReadPollTimeout());
             request.masterNodeTimeout(TimeValue.MAX_VALUE);
 
+            return request;
+        }
+
+        private void followLeaderIndex(
+            String autoFollowPattenName,
+            String remoteClusterString,
+            Index indexToFollow,
+            IndexAbstraction indexAbstraction,
+            AutoFollowPattern pattern,
+            Map<String, String> headers,
+            Consumer<Exception> onResult
+        ) {
+            PutFollowAction.Request request = generateRequest(remoteClusterString, indexToFollow, indexAbstraction, pattern);
+
             // Execute if the create and follow api call succeeds:
             Runnable successHandler = () -> {
-                LOGGER.info("auto followed leader index [{}] as follow index [{}]", indexToFollow, followIndexName);
+                LOGGER.info("auto followed leader index [{}] as follow index [{}]", indexToFollow, request.getFollowerIndex());
 
                 // This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
                 // (so that we do not try to follow it in subsequent auto follow runs)
@@ -731,6 +787,22 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
             }
         }
 
+        /**
+         * Given an auto following pattern for a set of indices and the cluster state from a remote
+         * cluster, return the list of indices that need to be followed. The list of followed index
+         * UUIDs contains indices that have already been followed, so the returned list will only
+         * contain "new" indices from the leader that need to be followed.
+         *
+         * When looking up the name of the index to see if it matches one of the patterns, the index
+         * abstraction ({@link IndexAbstraction}) of the index is used for comparison, this means
+         * that if an index named ".ds-foo" was part of a data stream "foo", then an auto-follow
+         * pattern of "f*" would allow the ".ds-foo" index to be returned.
+         *
+         * @param autoFollowPattern pattern to check indices that may need to be followed
+         * @param remoteClusterState state from the remote ES cluster
+         * @param followedIndexUUIDs a collection of UUIDs of indices already being followed
+         * @return any new indices on the leader that need to be followed
+         */
         static List<Index> getLeaderIndicesToFollow(
             AutoFollowPattern autoFollowPattern,
             ClusterState remoteClusterState,
@@ -760,9 +832,45 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
             return leaderIndicesToFollow;
         }
 
+        /**
+         * Returns the new name for the follower index. If the auto-follow configuration includes a
+         * follow index pattern, the text "{@code {{leader_index}}}" is replaced with the original
+         * index name, so a leader index called "foo" and a pattern of "{{leader_index}}_copy"
+         * becomes a new follower index called "foo_copy".
+         */
         static String getFollowerIndexName(AutoFollowPattern autoFollowPattern, String leaderIndexName) {
-            if (autoFollowPattern.getFollowIndexPattern() != null) {
-                return autoFollowPattern.getFollowIndexPattern().replace("{{leader_index}}", leaderIndexName);
+            final String followPattern = autoFollowPattern.getFollowIndexPattern();
+            if (followPattern != null) {
+                if (leaderIndexName.contains(DataStream.BACKING_INDEX_PREFIX)) {
+                    // The index being replicated is a data stream backing index, so it's something
+                    // like: <optional-prefix>.ds-<data-stream-name>-20XX-mm-dd-NNNNNN
+                    //
+                    // However, we cannot just replace the name with the proposed follow index
+                    // pattern, or else we'll end up with something like ".ds-logs-foo-bar-2022-02-02-000001_copy"
+                    // for "{{leader_index}}_copy", which will cause problems because it doesn't
+                    // follow a parseable pattern. Instead it would be better to rename it as though
+                    // the data stream name was the leader index name, ending up with
+                    // ".ds-logs-foo-bar_copy-2022-02-02-000001" as the final index name.
+                    Matcher m = DS_BACKING_PATTERN.matcher(leaderIndexName);
+                    if (m.find()) {
+                        return m.group(1) + // Prefix including ".ds-"
+                            followPattern.replace(AUTO_FOLLOW_PATTERN_REPLACEMENT, m.group(2)) + // Data stream name changed
+                            "-" + // Hyphen separator
+                            m.group(3) + // Date math
+                            m.group(4);
+                    } else {
+                        throw new IllegalArgumentException(
+                            "unable to determine follower index name from leader index name ["
+                                + leaderIndexName
+                                + "] and follow index pattern: ["
+                                + followPattern
+                                + "], index appears to follow a regular data stream backing pattern, but could not be parsed"
+                        );
+                    }
+                } else {
+                    // If the index does nat contain a `.ds-<thing>`, then rename it as usual.
+                    return followPattern.replace("{{leader_index}}", leaderIndexName);
+                }
             } else {
                 return leaderIndexName;
             }

+ 40 - 17
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

@@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -169,17 +170,6 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction<Pu
             return;
         }
 
-        if (remoteDataStream != null) {
-            // when following a backing index then the names of the backing index must be remain the same in the local
-            // and remote cluster.
-            if (request.getLeaderIndex().equals(request.getFollowerIndex()) == false) {
-                listener.onFailure(
-                    new IllegalArgumentException("a backing index name in the local and remote cluster must remain the same")
-                );
-                return;
-            }
-        }
-
         final Settings overrideSettings = Settings.builder()
             .put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, request.getFollowerIndex())
             .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
@@ -215,15 +205,37 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction<Pu
                     (delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response)
                 );
                 if (remoteDataStream == null) {
+                    // If the index we're following is not part of a data stream, start the
+                    // restoration of the index normally.
                     restoreService.restoreSnapshot(restoreRequest, delegatelistener);
                 } else {
                     String followerIndexName = request.getFollowerIndex();
+                    // This method is used to update the metadata in the same cluster state
+                    // update as the snapshot is restored.
                     BiConsumer<ClusterState, Metadata.Builder> updater = (currentState, mdBuilder) -> {
-                        DataStream localDataStream = mdBuilder.dataStreamMetadata().dataStreams().get(remoteDataStream.getName());
-                        Index followerIndex = mdBuilder.get(followerIndexName).getIndex();
-                        assert followerIndex != null;
+                        final String localDataStreamName;
+
+                        // If we have been given a data stream name, use that name for the local
+                        // data stream. See the javadoc for AUTO_FOLLOW_PATTERN_REPLACEMENT
+                        // for more info.
+                        final String dsName = request.getDataStreamName();
+                        if (Strings.hasText(dsName)) {
+                            localDataStreamName = dsName;
+                        } else {
+                            // There was no specified name, use the original data stream name.
+                            localDataStreamName = remoteDataStream.getName();
+                        }
+                        final DataStream localDataStream = mdBuilder.dataStreamMetadata().dataStreams().get(localDataStreamName);
+                        final Index followerIndex = mdBuilder.get(followerIndexName).getIndex();
+                        assert followerIndex != null
+                            : "expected followerIndex " + followerIndexName + " to exist in the state, but it did not";
 
-                        DataStream updatedDataStream = updateLocalDataStream(followerIndex, localDataStream, remoteDataStream);
+                        final DataStream updatedDataStream = updateLocalDataStream(
+                            followerIndex,
+                            localDataStream,
+                            localDataStreamName,
+                            remoteDataStream
+                        );
                         mdBuilder.put(updatedDataStream);
                     };
                     restoreService.restoreSnapshot(restoreRequest, delegatelistener, updater);
@@ -303,12 +315,23 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction<Pu
         );
     }
 
-    static DataStream updateLocalDataStream(Index backingIndexToFollow, DataStream localDataStream, DataStream remoteDataStream) {
+    /**
+     * Given the backing index that the follower is going to follow, the local data stream (if it
+     * exists) and the remote data stream, return the new local data stream for the local cluster
+     * (the follower) updated with whichever information is necessary to restore the new
+     * soon-to-be-followed index.
+     */
+    static DataStream updateLocalDataStream(
+        Index backingIndexToFollow,
+        DataStream localDataStream,
+        String localDataStreamName,
+        DataStream remoteDataStream
+    ) {
         if (localDataStream == null) {
             // The data stream and the backing indices have been created and validated in the remote cluster,
             // just copying the data stream is in this case safe.
             return new DataStream(
-                remoteDataStream.getName(),
+                localDataStreamName,
                 List.of(backingIndexToFollow),
                 remoteDataStream.getGeneration(),
                 remoteDataStream.getMetadata(),

+ 328 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -32,6 +33,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ESTestCase;
@@ -74,6 +76,7 @@ import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollo
 import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction;
 import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -1001,6 +1004,331 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             null
         );
         assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
+
+        // Test that index of data stream type name works correctly:
+        autoFollowPattern = new AutoFollowPattern(
+            "remote",
+            List.of("logs-*"),
+            List.of(),
+            "{{leader_index}}_copy",
+            Settings.EMPTY,
+            true,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        );
+        assertThat(
+            AutoFollower.getFollowerIndexName(autoFollowPattern, ".ds-logs-foo-bar-2022-02-01-123456"),
+            equalTo(".ds-logs-foo-bar_copy-2022-02-01-123456")
+        );
+
+        autoFollowPattern = new AutoFollowPattern(
+            "remote",
+            List.of("logs-*"),
+            List.of(),
+            "prepend_{{leader_index}}",
+            Settings.EMPTY,
+            true,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        );
+        assertThat(
+            AutoFollower.getFollowerIndexName(autoFollowPattern, ".ds-logs-foo-bar-2022-02-01-123456"),
+            equalTo(".ds-prepend_logs-foo-bar-2022-02-01-123456")
+        );
+
+    }
+
+    public void testGenerateRequest() {
+        // Renaming with a suffix and normal pattern backing indices
+        {
+            AutoFollowPattern pattern = new AutoFollowPattern(
+                "remote",
+                List.of("logs-*"),
+                List.of(),
+                "{{leader_index}}_copy",
+                Settings.EMPTY,
+                true,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            );
+
+            Index index = new Index(".ds-logs-foo-bar-2022-02-01-123456", "uuid");
+            IndexAbstraction indexAbstraction = new IndexAbstraction.ConcreteIndex(
+                IndexMetadata.builder(index.getName())
+                    .settings(
+                        Settings.builder()
+                            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
+                            .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+                            .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
+                            .build()
+                    )
+                    .build(),
+                new IndexAbstraction.DataStream(
+                    new DataStream("logs-foo-bar", List.of(index), 1, Map.of(), false, false, false, true, IndexMode.STANDARD)
+                )
+            );
+
+            PutFollowAction.Request request = AutoFollower.generateRequest("remote", index, indexAbstraction, pattern);
+            assertThat(request.getRemoteCluster(), equalTo("remote"));
+            assertThat(request.getFollowerIndex(), equalTo(".ds-logs-foo-bar_copy-2022-02-01-123456"));
+            assertThat(request.getLeaderIndex(), equalTo(".ds-logs-foo-bar-2022-02-01-123456"));
+            assertThat(request.getDataStreamName(), equalTo("logs-foo-bar_copy"));
+        }
+
+        // Renaming with a prefix and normal pattern backing indices
+        {
+            AutoFollowPattern pattern = new AutoFollowPattern(
+                "remote",
+                List.of("logs-*"),
+                List.of(),
+                "copy_{{leader_index}}",
+                Settings.EMPTY,
+                true,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            );
+
+            Index index = new Index(".ds-logs-foo-bar-2022-02-01-123456", "uuid");
+            IndexAbstraction indexAbstraction = new IndexAbstraction.ConcreteIndex(
+                IndexMetadata.builder(index.getName())
+                    .settings(
+                        Settings.builder()
+                            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
+                            .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+                            .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
+                            .build()
+                    )
+                    .build(),
+                new IndexAbstraction.DataStream(
+                    new DataStream("logs-foo-bar", List.of(index), 1, Map.of(), false, false, false, true, IndexMode.STANDARD)
+                )
+            );
+
+            PutFollowAction.Request request = AutoFollower.generateRequest("remote", index, indexAbstraction, pattern);
+            assertThat(request.getRemoteCluster(), equalTo("remote"));
+            assertThat(request.getFollowerIndex(), equalTo(".ds-copy_logs-foo-bar-2022-02-01-123456"));
+            assertThat(request.getLeaderIndex(), equalTo(".ds-logs-foo-bar-2022-02-01-123456"));
+            assertThat(request.getDataStreamName(), equalTo("copy_logs-foo-bar"));
+        }
+
+        // Renaming with a suffix and irregular pattern backing indices
+        {
+            AutoFollowPattern pattern = new AutoFollowPattern(
+                "remote",
+                List.of("logs-*"),
+                List.of(),
+                "{{leader_index}}_copy",
+                Settings.EMPTY,
+                true,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            );
+
+            Index index = new Index("my-backing-index", "uuid");
+            IndexAbstraction indexAbstraction = new IndexAbstraction.ConcreteIndex(
+                IndexMetadata.builder(index.getName())
+                    .settings(
+                        Settings.builder()
+                            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
+                            .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+                            .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
+                            .build()
+                    )
+                    .build(),
+                new IndexAbstraction.DataStream(
+                    new DataStream("logs-foo-bar", List.of(index), 1, Map.of(), false, false, false, true, IndexMode.STANDARD)
+                )
+            );
+
+            PutFollowAction.Request request = AutoFollower.generateRequest("remote", index, indexAbstraction, pattern);
+            assertThat(request.getRemoteCluster(), equalTo("remote"));
+            assertThat(request.getFollowerIndex(), equalTo("my-backing-index_copy"));
+            assertThat(request.getLeaderIndex(), equalTo("my-backing-index"));
+            assertThat(request.getDataStreamName(), equalTo("logs-foo-bar_copy"));
+        }
+
+        // Renaming with a suffix but not part of a data stream
+        {
+            AutoFollowPattern pattern = new AutoFollowPattern(
+                "remote",
+                List.of("logs-*"),
+                List.of(),
+                "{{leader_index}}_copy",
+                Settings.EMPTY,
+                true,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            );
+
+            Index index = new Index(".ds-logs-foo-bar-2022-02-01-123456", "uuid");
+            IndexAbstraction indexAbstraction = new IndexAbstraction.ConcreteIndex(
+                IndexMetadata.builder(index.getName())
+                    .settings(
+                        Settings.builder()
+                            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
+                            .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+                            .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
+                            .build()
+                    )
+                    .build(),
+                null
+            );
+
+            PutFollowAction.Request request = AutoFollower.generateRequest("remote", index, indexAbstraction, pattern);
+            assertThat(request.getRemoteCluster(), equalTo("remote"));
+            assertThat(request.getFollowerIndex(), equalTo(".ds-logs-foo-bar_copy-2022-02-01-123456"));
+            assertThat(request.getLeaderIndex(), equalTo(".ds-logs-foo-bar-2022-02-01-123456"));
+            assertThat(request.getDataStreamName(), equalTo(null));
+        }
+
+        // Regular backing index, but no renaming
+        {
+            AutoFollowPattern pattern = new AutoFollowPattern(
+                "remote",
+                List.of("logs-*"),
+                List.of(),
+                null,
+                Settings.EMPTY,
+                true,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            );
+
+            Index index = new Index(".ds-logs-foo-bar-2022-02-01-123456", "uuid");
+            IndexAbstraction indexAbstraction = new IndexAbstraction.ConcreteIndex(
+                IndexMetadata.builder(index.getName())
+                    .settings(
+                        Settings.builder()
+                            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
+                            .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+                            .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
+                            .build()
+                    )
+                    .build(),
+                new IndexAbstraction.DataStream(
+                    new DataStream("logs-foo-bar", List.of(index), 1, Map.of(), false, false, false, true, IndexMode.STANDARD)
+                )
+            );
+
+            PutFollowAction.Request request = AutoFollower.generateRequest("remote", index, indexAbstraction, pattern);
+            assertThat(request.getRemoteCluster(), equalTo("remote"));
+            assertThat(request.getFollowerIndex(), equalTo(".ds-logs-foo-bar-2022-02-01-123456"));
+            assertThat(request.getLeaderIndex(), equalTo(".ds-logs-foo-bar-2022-02-01-123456"));
+            assertThat(request.getDataStreamName(), equalTo(null));
+        }
+
+        // Renaming with a suffix and just the worst named backing indices
+        {
+            AutoFollowPattern pattern = new AutoFollowPattern(
+                "remote",
+                List.of("logs-*"),
+                List.of(),
+                "{{leader_index}}_copy",
+                Settings.EMPTY,
+                true,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            );
+
+            Index index = new Index("my-.ds-backing-index", "uuid");
+            IndexAbstraction indexAbstraction = new IndexAbstraction.ConcreteIndex(
+                IndexMetadata.builder(index.getName())
+                    .settings(
+                        Settings.builder()
+                            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
+                            .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+                            .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
+                            .build()
+                    )
+                    .build(),
+                new IndexAbstraction.DataStream(
+                    new DataStream("logs-foo-bar", List.of(index), 1, Map.of(), false, false, false, true, IndexMode.STANDARD)
+                )
+            );
+
+            IllegalArgumentException e = expectThrows(
+                IllegalArgumentException.class,
+                () -> AutoFollower.generateRequest("remote", index, indexAbstraction, pattern)
+            );
+            assertThat(
+                e.getMessage(),
+                containsString(
+                    "unable to determine follower index name from leader index name "
+                        + "[my-.ds-backing-index] and follow index pattern: [{{leader_index}}_copy]"
+                        + ", index appears to follow a regular data stream backing pattern, but could not be parsed"
+                )
+            );
+        }
     }
 
     public void testStats() {

+ 5 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowParametersTests.java

@@ -38,6 +38,11 @@ public class FollowParametersTests extends AbstractSerializingTestCase<FollowPar
         return FollowParameters::new;
     }
 
+    @Override
+    protected FollowParameters mutateInstance(FollowParameters instance) {
+        return randomInstance();
+    }
+
     static FollowParameters randomInstance() {
         FollowParameters followParameters = new FollowParameters();
         followParameters.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));

+ 37 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java

@@ -11,6 +11,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.AbstractSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
@@ -38,6 +39,7 @@ public class PutFollowActionRequestTests extends AbstractSerializingTestCase<Put
             Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 4)).build()
         );
         ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters());
+        request.setDataStreamName(randomAlphaOfLength(4));
         return request;
     }
 
@@ -53,6 +55,7 @@ public class PutFollowActionRequestTests extends AbstractSerializingTestCase<Put
         );
         request.setFollowerIndex("followerIndex");
         ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters());
+        request.setDataStreamName(randomAlphaOfLength(4));
         return request;
     }
 
@@ -61,6 +64,40 @@ public class PutFollowActionRequestTests extends AbstractSerializingTestCase<Put
         return PutFollowAction.Request.fromXContent(parser, "followerIndex", ActiveShardCount.DEFAULT);
     }
 
+    @Override
+    protected PutFollowAction.Request mutateInstance(PutFollowAction.Request instance) throws IOException {
+        PutFollowAction.Request request = new PutFollowAction.Request();
+        request.setFollowerIndex(instance.getFollowerIndex());
+        request.waitForActiveShards(instance.waitForActiveShards());
+        request.setRemoteCluster(instance.getRemoteCluster());
+        request.setLeaderIndex(instance.getLeaderIndex());
+        request.setSettings(instance.getSettings());
+        request.setParameters(instance.getParameters());
+        request.setDataStreamName(instance.getDataStreamName());
+
+        switch (randomIntBetween(0, 6)) {
+            case 0 -> request.setFollowerIndex(randomAlphaOfLength(5));
+            case 1 -> request.waitForActiveShards(new ActiveShardCount(randomIntBetween(3, 5)));
+            case 2 -> request.setRemoteCluster(randomAlphaOfLength(5));
+            case 3 -> request.setLeaderIndex(randomAlphaOfLength(5));
+            case 4 -> request.setSettings(
+                Settings.builder()
+                    .put(
+                        IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(),
+                        randomValueOtherThan(
+                            IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(request.getSettings()),
+                            ESTestCase::randomInt
+                        )
+                    )
+                    .build()
+            );
+            case 5 -> request.setParameters(FollowParametersTests.randomInstance());
+            case 6 -> request.setDataStreamName(randomAlphaOfLength(5));
+            default -> throw new AssertionError("failed branch");
+        }
+        return request;
+    }
+
     @Override
     protected boolean supportsUnknownFields() {
         return false;

+ 24 - 4
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java

@@ -24,7 +24,12 @@ public class TransportPutFollowActionTests extends ESTestCase {
     public void testCreateNewLocalDataStream() {
         DataStream remoteDataStream = generateDataSteam("logs-foobar", 3, false);
         Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1);
-        DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, null, remoteDataStream);
+        DataStream result = TransportPutFollowAction.updateLocalDataStream(
+            backingIndexToFollow,
+            null,
+            remoteDataStream.getName(),
+            remoteDataStream
+        );
         assertThat(result.getName(), equalTo(remoteDataStream.getName()));
         assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField()));
         assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration()));
@@ -36,7 +41,12 @@ public class TransportPutFollowActionTests extends ESTestCase {
         DataStream remoteDataStream = generateDataSteam("logs-foobar", 3, false);
         DataStream localDataStream = generateDataSteam("logs-foobar", 2, true);
         Index backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 1);
-        DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream);
+        DataStream result = TransportPutFollowAction.updateLocalDataStream(
+            backingIndexToFollow,
+            localDataStream,
+            remoteDataStream.getName(),
+            remoteDataStream
+        );
         assertThat(result.getName(), equalTo(remoteDataStream.getName()));
         assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField()));
         assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration()));
@@ -51,7 +61,12 @@ public class TransportPutFollowActionTests extends ESTestCase {
         DataStream remoteDataStream = generateDataSteam("logs-foobar", 5, false);
         DataStream localDataStream = generateDataSteam("logs-foobar", 5, true, DataStream.getDefaultBackingIndexName("logs-foobar", 5));
         Index backingIndexToFollow = remoteDataStream.getIndices().get(0);
-        DataStream result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream);
+        DataStream result = TransportPutFollowAction.updateLocalDataStream(
+            backingIndexToFollow,
+            localDataStream,
+            remoteDataStream.getName(),
+            remoteDataStream
+        );
         assertThat(result.getName(), equalTo(remoteDataStream.getName()));
         assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField()));
         assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration()));
@@ -62,7 +77,12 @@ public class TransportPutFollowActionTests extends ESTestCase {
         // follow second last backing index:
         localDataStream = result;
         backingIndexToFollow = remoteDataStream.getIndices().get(remoteDataStream.getIndices().size() - 2);
-        result = TransportPutFollowAction.updateLocalDataStream(backingIndexToFollow, localDataStream, remoteDataStream);
+        result = TransportPutFollowAction.updateLocalDataStream(
+            backingIndexToFollow,
+            localDataStream,
+            remoteDataStream.getName(),
+            remoteDataStream
+        );
         assertThat(result.getName(), equalTo(remoteDataStream.getName()));
         assertThat(result.getTimeStampField(), equalTo(remoteDataStream.getTimeStampField()));
         assertThat(result.getGeneration(), equalTo(remoteDataStream.getGeneration()));

+ 34 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java

@@ -15,9 +15,11 @@ import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xcontent.ObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContentObject;
@@ -43,6 +45,7 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
         private static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
         private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
         private static final ParseField SETTINGS_FIELD = new ParseField("settings");
+        private static final ParseField DATA_STREAM_NAME = new ParseField("data_stream_name");
 
         // Note that Request should be the Value class here for this parser with a 'parameters' field that maps to
         // PutFollowParameters class. But since two minor version are already released with duplicate follow parameters
@@ -52,6 +55,7 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
         static {
             PARSER.declareString((putFollowParameters, value) -> putFollowParameters.remoteCluster = value, REMOTE_CLUSTER_FIELD);
             PARSER.declareString((putFollowParameters, value) -> putFollowParameters.leaderIndex = value, LEADER_INDEX_FIELD);
+            PARSER.declareString((putFollowParameters, value) -> putFollowParameters.dataStreamName = value, DATA_STREAM_NAME);
             PARSER.declareObject(
                 (putFollowParameters, value) -> putFollowParameters.settings = value,
                 (p, c) -> Settings.fromXContent(p),
@@ -69,6 +73,7 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
             request.setFollowerIndex(followerIndex);
             request.setRemoteCluster(parameters.remoteCluster);
             request.setLeaderIndex(parameters.leaderIndex);
+            request.setDataStreamName(parameters.dataStreamName);
             request.setSettings(parameters.settings);
             request.setParameters(parameters);
             return request;
@@ -76,8 +81,10 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
 
         private String remoteCluster;
         private String leaderIndex;
-        private Settings settings = Settings.EMPTY;
         private String followerIndex;
+        @Nullable
+        private String dataStreamName;
+        private Settings settings = Settings.EMPTY;
         private FollowParameters parameters = new FollowParameters();
         private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
 
@@ -123,6 +130,15 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
             this.parameters = parameters;
         }
 
+        @Nullable
+        public String getDataStreamName() {
+            return dataStreamName;
+        }
+
+        public void setDataStreamName(String dataStreamName) {
+            this.dataStreamName = dataStreamName;
+        }
+
         public ActiveShardCount waitForActiveShards() {
             return waitForActiveShards;
         }
@@ -156,6 +172,9 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
             if (followerIndex == null) {
                 e = addValidationError("follower_index is missing", e);
             }
+            if (dataStreamName != null && Strings.hasText(dataStreamName) == false) {
+                e = addValidationError("data stream name must contain text if present", e);
+            }
             return e;
         }
 
@@ -179,6 +198,9 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
             }
             this.parameters = new FollowParameters(in);
             waitForActiveShards(ActiveShardCount.readFrom(in));
+            if (in.getVersion().onOrAfter(Version.V_8_5_0)) {
+                this.dataStreamName = in.readOptionalString();
+            }
         }
 
         @Override
@@ -192,6 +214,9 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
             }
             parameters.writeTo(out);
             waitForActiveShards.writeTo(out);
+            if (out.getVersion().onOrAfter(Version.V_8_5_0)) {
+                out.writeOptionalString(this.dataStreamName);
+            }
         }
 
         @Override
@@ -200,6 +225,9 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
             {
                 builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
                 builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
+                if (dataStreamName != null) {
+                    builder.field(DATA_STREAM_NAME.getPreferredName(), dataStreamName);
+                }
                 if (settings.isEmpty() == false) {
                     builder.startObject(SETTINGS_FIELD.getPreferredName());
                     {
@@ -222,12 +250,14 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
                 && Objects.equals(leaderIndex, request.leaderIndex)
                 && Objects.equals(followerIndex, request.followerIndex)
                 && Objects.equals(parameters, request.parameters)
-                && Objects.equals(waitForActiveShards, request.waitForActiveShards);
+                && Objects.equals(waitForActiveShards, request.waitForActiveShards)
+                && Objects.equals(dataStreamName, request.dataStreamName)
+                && Objects.equals(settings, request.settings);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(remoteCluster, leaderIndex, followerIndex, parameters, waitForActiveShards);
+            return Objects.hash(remoteCluster, leaderIndex, followerIndex, parameters, settings, waitForActiveShards, dataStreamName);
         }
 
         // This class only exists for reuse of the FollowParameters class, see comment above the parser field.
@@ -235,6 +265,7 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
 
             private String remoteCluster;
             private String leaderIndex;
+            private String dataStreamName;
             private Settings settings = Settings.EMPTY;
 
         }