|  | @@ -27,6 +27,7 @@ import org.elasticsearch.rest.RestStatus;
 | 
	
		
			
				|  |  |  import java.io.IOException;
 | 
	
		
			
				|  |  |  import java.text.SimpleDateFormat;
 | 
	
		
			
				|  |  |  import java.util.Date;
 | 
	
		
			
				|  |  | +import java.util.List;
 | 
	
		
			
				|  |  |  import java.util.Locale;
 | 
	
		
			
				|  |  |  import java.util.Map;
 | 
	
		
			
				|  |  |  import java.util.concurrent.TimeUnit;
 | 
	
	
		
			
				|  | @@ -54,40 +55,58 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |              return;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
 | 
	
		
			
				|  |  | -        Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern");
 | 
	
		
			
				|  |  | -        putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}");
 | 
	
		
			
				|  |  | -        assertOK(client().performRequest(putPatternRequest));
 | 
	
		
			
				|  |  | -        putPatternRequest = new Request("PUT", "/_ccr/auto_follow/middle_cluster_pattern");
 | 
	
		
			
				|  |  | -        putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"middle_cluster\"}");
 | 
	
		
			
				|  |  | -        assertOK(client().performRequest(putPatternRequest));
 | 
	
		
			
				|  |  | -        try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | -            Request request = new Request("PUT", "/logs-20190101");
 | 
	
		
			
				|  |  | -            request.setJsonEntity("{\"mappings\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}}");
 | 
	
		
			
				|  |  | -            assertOK(leaderClient.performRequest(request));
 | 
	
		
			
				|  |  | -            for (int i = 0; i < 5; i++) {
 | 
	
		
			
				|  |  | -                String id = Integer.toString(i);
 | 
	
		
			
				|  |  | -                index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true");
 | 
	
		
			
				|  |  | +        try {
 | 
	
		
			
				|  |  | +            int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
 | 
	
		
			
				|  |  | +            Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern");
 | 
	
		
			
				|  |  | +            putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"index-*\"], \"remote_cluster\": \"leader_cluster\"}");
 | 
	
		
			
				|  |  | +            assertOK(client().performRequest(putPatternRequest));
 | 
	
		
			
				|  |  | +            putPatternRequest = new Request("PUT", "/_ccr/auto_follow/middle_cluster_pattern");
 | 
	
		
			
				|  |  | +            putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"index-*\"], \"remote_cluster\": \"middle_cluster\"}");
 | 
	
		
			
				|  |  | +            assertOK(client().performRequest(putPatternRequest));
 | 
	
		
			
				|  |  | +            try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | +                Request request = new Request("PUT", "/index-20190101");
 | 
	
		
			
				|  |  | +                request.setJsonEntity("{\"mappings\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}}");
 | 
	
		
			
				|  |  | +                assertOK(leaderClient.performRequest(request));
 | 
	
		
			
				|  |  | +                for (int i = 0; i < 5; i++) {
 | 
	
		
			
				|  |  | +                    String id = Integer.toString(i);
 | 
	
		
			
				|  |  | +                    index(leaderClient, "index-20190101", id, "field", i, "filtered_field", "true");
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        try (RestClient middleClient = buildMiddleClient()) {
 | 
	
		
			
				|  |  | -            Request request = new Request("PUT", "/logs-20200101");
 | 
	
		
			
				|  |  | -            request.setJsonEntity("{\"mappings\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}}");
 | 
	
		
			
				|  |  | -            assertOK(middleClient.performRequest(request));
 | 
	
		
			
				|  |  | -            for (int i = 0; i < 5; i++) {
 | 
	
		
			
				|  |  | -                String id = Integer.toString(i);
 | 
	
		
			
				|  |  | -                index(middleClient, "logs-20200101", id, "field", i, "filtered_field", "true");
 | 
	
		
			
				|  |  | +            try (RestClient middleClient = buildMiddleClient()) {
 | 
	
		
			
				|  |  | +                Request request = new Request("PUT", "/index-20200101");
 | 
	
		
			
				|  |  | +                request.setJsonEntity("{\"mappings\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}}");
 | 
	
		
			
				|  |  | +                assertOK(middleClient.performRequest(request));
 | 
	
		
			
				|  |  | +                for (int i = 0; i < 5; i++) {
 | 
	
		
			
				|  |  | +                    String id = Integer.toString(i);
 | 
	
		
			
				|  |  | +                    index(middleClient, "index-20200101", id, "field", i, "filtered_field", "true");
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        assertBusy(() -> {
 | 
	
		
			
				|  |  | -            assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2));
 | 
	
		
			
				|  |  | +            assertBusy(() -> {
 | 
	
		
			
				|  |  | +                assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            ensureYellow("logs-20190101");
 | 
	
		
			
				|  |  | -            ensureYellow("logs-20200101");
 | 
	
		
			
				|  |  | -            verifyDocuments("logs-20190101", 5, "filtered_field:true");
 | 
	
		
			
				|  |  | -            verifyDocuments("logs-20200101", 5, "filtered_field:true");
 | 
	
		
			
				|  |  | -        });
 | 
	
		
			
				|  |  | -        deleteAutoFollowPattern("leader_cluster_pattern");
 | 
	
		
			
				|  |  | +                ensureYellow("index-20190101");
 | 
	
		
			
				|  |  | +                ensureYellow("index-20200101");
 | 
	
		
			
				|  |  | +                verifyDocuments("index-20190101", 5, "filtered_field:true");
 | 
	
		
			
				|  |  | +                verifyDocuments("index-20200101", 5, "filtered_field:true");
 | 
	
		
			
				|  |  | +            });
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        } finally {
 | 
	
		
			
				|  |  | +            cleanUpFollower(
 | 
	
		
			
				|  |  | +                List.of("index-20190101", "index-20200101"),
 | 
	
		
			
				|  |  | +                List.of(),
 | 
	
		
			
				|  |  | +                List.of("leader_cluster_pattern", "middle_cluster_pattern")
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            cleanUpMiddle(
 | 
	
		
			
				|  |  | +                List.of("index-20200101"),
 | 
	
		
			
				|  |  | +                List.of(),
 | 
	
		
			
				|  |  | +                List.of()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            cleanUpLeader(
 | 
	
		
			
				|  |  | +                List.of("index-20190101"),
 | 
	
		
			
				|  |  | +                List.of(),
 | 
	
		
			
				|  |  | +                List.of()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testAutoFollowPatterns() throws Exception {
 | 
	
	
		
			
				|  | @@ -96,57 +115,63 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |              return;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
 | 
	
		
			
				|  |  | -        Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
 | 
	
		
			
				|  |  | -        final boolean overrideNumberOfReplicas = randomBoolean();
 | 
	
		
			
				|  |  | -        try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
 | 
	
		
			
				|  |  | -            bodyBuilder.startObject();
 | 
	
		
			
				|  |  | -            {
 | 
	
		
			
				|  |  | -                bodyBuilder.startArray("leader_index_patterns");
 | 
	
		
			
				|  |  | +        final String autoFollowPatternName = getTestName().toLowerCase(Locale.ROOT);
 | 
	
		
			
				|  |  | +        try {
 | 
	
		
			
				|  |  | +            int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
 | 
	
		
			
				|  |  | +            Request request = new Request("PUT", "/_ccr/auto_follow/" + autoFollowPatternName);
 | 
	
		
			
				|  |  | +            final boolean overrideNumberOfReplicas = randomBoolean();
 | 
	
		
			
				|  |  | +            try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
 | 
	
		
			
				|  |  | +                bodyBuilder.startObject();
 | 
	
		
			
				|  |  |                  {
 | 
	
		
			
				|  |  | -                    bodyBuilder.value("metrics-*");
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  | -                bodyBuilder.endArray();
 | 
	
		
			
				|  |  | -                bodyBuilder.field("remote_cluster", "leader_cluster");
 | 
	
		
			
				|  |  | -                if (overrideNumberOfReplicas) {
 | 
	
		
			
				|  |  | -                    bodyBuilder.startObject("settings");
 | 
	
		
			
				|  |  | +                    bodyBuilder.startArray("leader_index_patterns");
 | 
	
		
			
				|  |  |                      {
 | 
	
		
			
				|  |  | -                        bodyBuilder.field("index.number_of_replicas", 0);
 | 
	
		
			
				|  |  | +                        bodyBuilder.value("metrics-*");
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                    bodyBuilder.endArray();
 | 
	
		
			
				|  |  | +                    bodyBuilder.field("remote_cluster", "leader_cluster");
 | 
	
		
			
				|  |  | +                    if (overrideNumberOfReplicas) {
 | 
	
		
			
				|  |  | +                        bodyBuilder.startObject("settings");
 | 
	
		
			
				|  |  | +                        {
 | 
	
		
			
				|  |  | +                            bodyBuilder.field("index.number_of_replicas", 0);
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  | +                        bodyBuilder.endObject();
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  | -                    bodyBuilder.endObject();
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | +                bodyBuilder.endObject();
 | 
	
		
			
				|  |  | +                request.setJsonEntity(Strings.toString(bodyBuilder));
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -            bodyBuilder.endObject();
 | 
	
		
			
				|  |  | -            request.setJsonEntity(Strings.toString(bodyBuilder));
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        assertOK(client().performRequest(request));
 | 
	
		
			
				|  |  | +            assertOK(client().performRequest(request));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | -            request = new Request("PUT", "/metrics-20210101");
 | 
	
		
			
				|  |  | -            request.setJsonEntity("{\"mappings\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}}");
 | 
	
		
			
				|  |  | -            assertOK(leaderClient.performRequest(request));
 | 
	
		
			
				|  |  | +            try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | +                request = new Request("PUT", "/metrics-20210101");
 | 
	
		
			
				|  |  | +                request.setJsonEntity("{\"mappings\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}}");
 | 
	
		
			
				|  |  | +                assertOK(leaderClient.performRequest(request));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            for (int i = 0; i < 5; i++) {
 | 
	
		
			
				|  |  | -                String id = Integer.toString(i);
 | 
	
		
			
				|  |  | -                index(leaderClient, "metrics-20210101", id, "field", i, "filtered_field", "true");
 | 
	
		
			
				|  |  | +                for (int i = 0; i < 5; i++) {
 | 
	
		
			
				|  |  | +                    String id = Integer.toString(i);
 | 
	
		
			
				|  |  | +                    index(leaderClient, "metrics-20210101", id, "field", i, "filtered_field", "true");
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        assertBusy(() -> {
 | 
	
		
			
				|  |  | -            assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
 | 
	
		
			
				|  |  | -            ensureYellow("metrics-20210101");
 | 
	
		
			
				|  |  | -            verifyDocuments("metrics-20210101", 5, "filtered_field:true");
 | 
	
		
			
				|  |  | -            if (overrideNumberOfReplicas) {
 | 
	
		
			
				|  |  | -                assertThat(getIndexSettingsAsMap("metrics-20210101"), hasEntry("index.number_of_replicas", "0"));
 | 
	
		
			
				|  |  | -            } else {
 | 
	
		
			
				|  |  | -                assertThat(getIndexSettingsAsMap("metrics-20210101"), hasEntry("index.number_of_replicas", "1"));
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | -        });
 | 
	
		
			
				|  |  | -        assertBusy(() -> {
 | 
	
		
			
				|  |  | -            verifyCcrMonitoring("metrics-20210101", "metrics-20210101");
 | 
	
		
			
				|  |  | -            verifyAutoFollowMonitoring();
 | 
	
		
			
				|  |  | -        }, 30, TimeUnit.SECONDS);
 | 
	
		
			
				|  |  | -        deleteAutoFollowPattern("test_pattern");
 | 
	
		
			
				|  |  | +            assertBusy(() -> {
 | 
	
		
			
				|  |  | +                assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
 | 
	
		
			
				|  |  | +                ensureYellow("metrics-20210101");
 | 
	
		
			
				|  |  | +                verifyDocuments("metrics-20210101", 5, "filtered_field:true");
 | 
	
		
			
				|  |  | +                if (overrideNumberOfReplicas) {
 | 
	
		
			
				|  |  | +                    assertThat(getIndexSettingsAsMap("metrics-20210101"), hasEntry("index.number_of_replicas", "0"));
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    assertThat(getIndexSettingsAsMap("metrics-20210101"), hasEntry("index.number_of_replicas", "1"));
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            });
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            assertBusy(() -> {
 | 
	
		
			
				|  |  | +                verifyCcrMonitoring("metrics-20210101", "metrics-20210101");
 | 
	
		
			
				|  |  | +                verifyAutoFollowMonitoring();
 | 
	
		
			
				|  |  | +            }, 30, TimeUnit.SECONDS);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        } finally {
 | 
	
		
			
				|  |  | +            cleanUpFollower(List.of("metrics-20210101"), List.of(), List.of(autoFollowPatternName));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws IOException {
 | 
	
	
		
			
				|  | @@ -195,14 +220,14 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final int numDocs = 64;
 | 
	
		
			
				|  |  |          final String dataStreamName = "logs-mysql-error";
 | 
	
		
			
				|  |  | +        final String autoFollowPatternName = getTestName().toLowerCase(Locale.ROOT);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
 | 
	
		
			
				|  |  | +        try {
 | 
	
		
			
				|  |  | +            // Create auto follow pattern
 | 
	
		
			
				|  |  | +            createAutoFollowPattern(client(), autoFollowPatternName, "logs-mysql-*", "leader_cluster");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // Create auto follow pattern
 | 
	
		
			
				|  |  | -        createAutoFollowPattern(client(), "test_pattern", "logs-*", "leader_cluster");
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        // Create data stream and ensure that is is auto followed
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | +            // Create data stream and ensure that is is auto followed
 | 
	
		
			
				|  |  |              try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  |                  for (int i = 0; i < numDocs; i++) {
 | 
	
		
			
				|  |  |                      Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
 | 
	
	
		
			
				|  | @@ -219,10 +244,8 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                  ensureYellow(dataStreamName);
 | 
	
		
			
				|  |  |                  verifyDocuments(client(), dataStreamName, numDocs);
 | 
	
		
			
				|  |  |              });
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // First rollover and ensure second backing index is replicated:
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | +            // First rollover and ensure second backing index is replicated:
 | 
	
		
			
				|  |  |              try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  |                  Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
 | 
	
		
			
				|  |  |                  assertOK(leaderClient.performRequest(rolloverRequest));
 | 
	
	
		
			
				|  | @@ -240,10 +263,8 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                  ensureYellow(dataStreamName);
 | 
	
		
			
				|  |  |                  verifyDocuments(client(), dataStreamName, numDocs + 1);
 | 
	
		
			
				|  |  |              });
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // Second rollover and ensure third backing index is replicated:
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | +            // Second rollover and ensure third backing index is replicated:
 | 
	
		
			
				|  |  |              try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  |                  Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
 | 
	
		
			
				|  |  |                  assertOK(leaderClient.performRequest(rolloverRequest));
 | 
	
	
		
			
				|  | @@ -263,11 +284,18 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                  ensureYellow(dataStreamName);
 | 
	
		
			
				|  |  |                  verifyDocuments(client(), dataStreamName, numDocs + 2);
 | 
	
		
			
				|  |  |              });
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        // Cleanup:
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | -            deleteAutoFollowPattern("test_pattern");
 | 
	
		
			
				|  |  | -            deleteDataStream(dataStreamName);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        } finally {
 | 
	
		
			
				|  |  | +            cleanUpFollower(
 | 
	
		
			
				|  |  | +                List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), backingIndexName(dataStreamName, 3)),
 | 
	
		
			
				|  |  | +                List.of(dataStreamName),
 | 
	
		
			
				|  |  | +                List.of(autoFollowPatternName)
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            cleanUpLeader(
 | 
	
		
			
				|  |  | +                List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), backingIndexName(dataStreamName, 3)),
 | 
	
		
			
				|  |  | +                List.of(dataStreamName),
 | 
	
		
			
				|  |  | +                List.of()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -277,10 +305,12 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final int initialNumDocs = 16;
 | 
	
		
			
				|  |  | -        final String dataStreamName = "logs-syslog-prod";
 | 
	
		
			
				|  |  |          int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
 | 
	
		
			
				|  |  | -        // Initialize data stream prior to auto following
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | +        final String dataStreamName = "logs-syslog-prod";
 | 
	
		
			
				|  |  | +        final String autoFollowPatternName = getTestName().toLowerCase(Locale.ROOT);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        try {
 | 
	
		
			
				|  |  | +            // Initialize data stream prior to auto following
 | 
	
		
			
				|  |  |              try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  |                  for (int i = 0; i < initialNumDocs; i++) {
 | 
	
		
			
				|  |  |                      Request indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
 | 
	
	
		
			
				|  | @@ -291,11 +321,11 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                  verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1));
 | 
	
		
			
				|  |  |                  verifyDocuments(leaderClient, dataStreamName, initialNumDocs);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        // Create auto follow pattern
 | 
	
		
			
				|  |  | -        createAutoFollowPattern(client(), "test_pattern", "logs-*", "leader_cluster");
 | 
	
		
			
				|  |  | -        // Rollover and ensure only second backing index is replicated:
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // Create auto follow pattern
 | 
	
		
			
				|  |  | +            createAutoFollowPattern(client(), autoFollowPatternName, "logs-syslog-*", "leader_cluster");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // Rollover and ensure only second backing index is replicated:
 | 
	
		
			
				|  |  |              try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  |                  Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
 | 
	
		
			
				|  |  |                  assertOK(leaderClient.performRequest(rolloverRequest));
 | 
	
	
		
			
				|  | @@ -313,9 +343,8 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                  ensureYellow(dataStreamName);
 | 
	
		
			
				|  |  |                  verifyDocuments(client(), dataStreamName, 1);
 | 
	
		
			
				|  |  |              });
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        // Explicitly follow the first backing index and check that the data stream in follow cluster is updated correctly:
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // Explicitly follow the first backing index and check that the data stream in follow cluster is updated correctly:
 | 
	
		
			
				|  |  |              followIndex(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 1));
 | 
	
		
			
				|  |  |              assertBusy(() -> {
 | 
	
		
			
				|  |  |                  assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
 | 
	
	
		
			
				|  | @@ -323,11 +352,18 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                  ensureYellow(dataStreamName);
 | 
	
		
			
				|  |  |                  verifyDocuments(client(), dataStreamName, initialNumDocs + 1);
 | 
	
		
			
				|  |  |              });
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        // Cleanup:
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | -            deleteAutoFollowPattern("test_pattern");
 | 
	
		
			
				|  |  | -            deleteDataStream(dataStreamName);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        } finally {
 | 
	
		
			
				|  |  | +            cleanUpFollower(
 | 
	
		
			
				|  |  | +                List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)),
 | 
	
		
			
				|  |  | +                List.of(dataStreamName),
 | 
	
		
			
				|  |  | +                List.of(autoFollowPatternName)
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            cleanUpLeader(
 | 
	
		
			
				|  |  | +                List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)),
 | 
	
		
			
				|  |  | +                List.of(dataStreamName),
 | 
	
		
			
				|  |  | +                List.of()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -338,14 +374,14 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final int numDocs = 64;
 | 
	
		
			
				|  |  |          final var dataStreamName = "logs-tomcat-prod";
 | 
	
		
			
				|  |  | +        final String autoFollowPatternName = getTestName().toLowerCase(Locale.ROOT);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
 | 
	
		
			
				|  |  | +        try {
 | 
	
		
			
				|  |  | +            // Create auto follow pattern
 | 
	
		
			
				|  |  | +            createAutoFollowPattern(client(), autoFollowPatternName, "logs-tomcat-*", "leader_cluster");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // Create auto follow pattern
 | 
	
		
			
				|  |  | -        createAutoFollowPattern(client(), "test_pattern", "logs-*", "leader_cluster");
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        // Create data stream and ensure that is is auto followed
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | +            // Create data stream and ensure that is is auto followed
 | 
	
		
			
				|  |  |              try (var leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  |                  for (int i = 0; i < numDocs; i++) {
 | 
	
		
			
				|  |  |                      var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc");
 | 
	
	
		
			
				|  | @@ -362,10 +398,8 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                  ensureYellow(dataStreamName);
 | 
	
		
			
				|  |  |                  verifyDocuments(client(), dataStreamName, numDocs);
 | 
	
		
			
				|  |  |              });
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // Rollover in leader cluster and ensure second backing index is replicated:
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | +            // Rollover in leader cluster and ensure second backing index is replicated:
 | 
	
		
			
				|  |  |              try (var leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  |                  var rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
 | 
	
		
			
				|  |  |                  assertOK(leaderClient.performRequest(rolloverRequest));
 | 
	
	
		
			
				|  | @@ -383,51 +417,58 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                  ensureYellow(dataStreamName);
 | 
	
		
			
				|  |  |                  verifyDocuments(client(), dataStreamName, numDocs + 1);
 | 
	
		
			
				|  |  |              });
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // Try rollover in follow cluster
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | -            var rolloverRequest1 = new Request("POST", "/" + dataStreamName + "/_rollover");
 | 
	
		
			
				|  |  | -            var e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1));
 | 
	
		
			
				|  |  | -            assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " +
 | 
	
		
			
				|  |  | -                "because it is a replicated data stream"));
 | 
	
		
			
				|  |  | -            verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            // Unfollow .ds-logs-tomcat-prod-000001
 | 
	
		
			
				|  |  | -            pauseFollow(backingIndexName(dataStreamName, 1));
 | 
	
		
			
				|  |  | -            closeIndex(backingIndexName(dataStreamName, 1));
 | 
	
		
			
				|  |  | -            unfollow(backingIndexName(dataStreamName, 1));
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            // Try again
 | 
	
		
			
				|  |  | -            var rolloverRequest2 = new Request("POST", "/" + dataStreamName + "/_rollover");
 | 
	
		
			
				|  |  | -            e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest2));
 | 
	
		
			
				|  |  | -            assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " +
 | 
	
		
			
				|  |  | -                "because it is a replicated data stream"));
 | 
	
		
			
				|  |  | -            verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            // Promote local data stream
 | 
	
		
			
				|  |  | -            var promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName);
 | 
	
		
			
				|  |  | -            assertOK(client().performRequest(promoteRequest));
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            // Try again and now the rollover should be successful because local data stream is now :
 | 
	
		
			
				|  |  | -            var rolloverRequest3 = new Request("POST", "/" + dataStreamName + "/_rollover");
 | 
	
		
			
				|  |  | -            assertOK(client().performRequest(rolloverRequest3));
 | 
	
		
			
				|  |  | -            verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2),
 | 
	
		
			
				|  |  | -                backingIndexName(dataStreamName, 3));
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            // TODO: verify that following a backing index for logs-tomcat-prod data stream in remote cluster fails,
 | 
	
		
			
				|  |  | -            // because local data stream isn't a replicated data stream anymore.
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            // Unfollow .ds-logs-tomcat-prod-000002,
 | 
	
		
			
				|  |  | -            // which is now possible because this index can now be closed as it is no longer the write index.
 | 
	
		
			
				|  |  | -            pauseFollow(backingIndexName(dataStreamName, 2));
 | 
	
		
			
				|  |  | -            closeIndex(backingIndexName(dataStreamName, 2));
 | 
	
		
			
				|  |  | -            unfollow(backingIndexName(dataStreamName, 2));
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        // Cleanup:
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | -            deleteAutoFollowPattern("test_pattern");
 | 
	
		
			
				|  |  | -            deleteDataStream(dataStreamName);
 | 
	
		
			
				|  |  | +            // Try rollover in follow cluster
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                var rolloverRequest1 = new Request("POST", "/" + dataStreamName + "/_rollover");
 | 
	
		
			
				|  |  | +                var e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1));
 | 
	
		
			
				|  |  | +                assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " +
 | 
	
		
			
				|  |  | +                    "because it is a replicated data stream"));
 | 
	
		
			
				|  |  | +                verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // Unfollow .ds-logs-tomcat-prod-000001
 | 
	
		
			
				|  |  | +                pauseFollow(backingIndexName(dataStreamName, 1));
 | 
	
		
			
				|  |  | +                closeIndex(backingIndexName(dataStreamName, 1));
 | 
	
		
			
				|  |  | +                unfollow(backingIndexName(dataStreamName, 1));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // Try again
 | 
	
		
			
				|  |  | +                var rolloverRequest2 = new Request("POST", "/" + dataStreamName + "/_rollover");
 | 
	
		
			
				|  |  | +                e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest2));
 | 
	
		
			
				|  |  | +                assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " +
 | 
	
		
			
				|  |  | +                    "because it is a replicated data stream"));
 | 
	
		
			
				|  |  | +                verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // Promote local data stream
 | 
	
		
			
				|  |  | +                var promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName);
 | 
	
		
			
				|  |  | +                assertOK(client().performRequest(promoteRequest));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // Try again and now the rollover should be successful because local data stream is now :
 | 
	
		
			
				|  |  | +                var rolloverRequest3 = new Request("POST", "/" + dataStreamName + "/_rollover");
 | 
	
		
			
				|  |  | +                assertOK(client().performRequest(rolloverRequest3));
 | 
	
		
			
				|  |  | +                verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2),
 | 
	
		
			
				|  |  | +                    backingIndexName(dataStreamName, 3));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // TODO: verify that following a backing index for logs-tomcat-prod data stream in remote cluster fails,
 | 
	
		
			
				|  |  | +                // because local data stream isn't a replicated data stream anymore.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // Unfollow .ds-logs-tomcat-prod-000002,
 | 
	
		
			
				|  |  | +                // which is now possible because this index can now be closed as it is no longer the write index.
 | 
	
		
			
				|  |  | +                pauseFollow(backingIndexName(dataStreamName, 2));
 | 
	
		
			
				|  |  | +                closeIndex(backingIndexName(dataStreamName, 2));
 | 
	
		
			
				|  |  | +                unfollow(backingIndexName(dataStreamName, 2));
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        } finally {
 | 
	
		
			
				|  |  | +            cleanUpFollower(
 | 
	
		
			
				|  |  | +                List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2), backingIndexName(dataStreamName, 3)),
 | 
	
		
			
				|  |  | +                List.of(dataStreamName),
 | 
	
		
			
				|  |  | +                List.of(autoFollowPatternName)
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            cleanUpLeader(
 | 
	
		
			
				|  |  | +                List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)),
 | 
	
		
			
				|  |  | +                List.of(dataStreamName),
 | 
	
		
			
				|  |  | +                List.of()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -440,12 +481,11 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |          final var aliasName = "log-tomcat-prod";
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
 | 
	
		
			
				|  |  | +        try {
 | 
	
		
			
				|  |  | +            // Create auto follow pattern
 | 
	
		
			
				|  |  | +            createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster");
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // Create auto follow pattern
 | 
	
		
			
				|  |  | -        createAutoFollowPattern(client(), "test_pattern", "log-*", "leader_cluster");
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        // Create leader index and write alias:
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | +            // Create leader index and write alias:
 | 
	
		
			
				|  |  |              try (var leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  |                  var createFirstIndexRequest = new Request("PUT", "/" + aliasName + "-000001");
 | 
	
		
			
				|  |  |                  createFirstIndexRequest.setJsonEntity("{\"aliases\": {\"" + aliasName + "\":{\"is_write_index\":true}}}");
 | 
	
	
		
			
				|  | @@ -466,10 +506,8 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                  ensureYellow(aliasName);
 | 
	
		
			
				|  |  |                  verifyDocuments(client(), aliasName, numDocs);
 | 
	
		
			
				|  |  |              });
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // Rollover in leader cluster and ensure second backing index is replicated:
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | +            // Rollover in leader cluster and ensure second backing index is replicated:
 | 
	
		
			
				|  |  |              try (var leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  |                  var rolloverRequest = new Request("POST", "/" + aliasName + "/_rollover");
 | 
	
		
			
				|  |  |                  assertOK(leaderClient.performRequest(rolloverRequest));
 | 
	
	
		
			
				|  | @@ -487,19 +525,18 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                  ensureYellow(aliasName);
 | 
	
		
			
				|  |  |                  verifyDocuments(client(), aliasName, numDocs + 1);
 | 
	
		
			
				|  |  |              });
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // Try rollover in follow cluster, this should fail, because is_write_index property of an alias isn't
 | 
	
		
			
				|  |  | -        // replicated to follow cluster.
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | -            var rolloverRequest1 = new Request("POST", "/" + aliasName + "/_rollover");
 | 
	
		
			
				|  |  | -            var e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1));
 | 
	
		
			
				|  |  | -            assertThat(e.getMessage(), containsString("rollover target [" + aliasName + "] does not point to a write index"));
 | 
	
		
			
				|  |  | -            verifyAlias(client(), aliasName, false, aliasName + "-000002", aliasName + "-000001");
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        // Cleanup:
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | -            deleteAutoFollowPattern("test_pattern");
 | 
	
		
			
				|  |  | +            // Try rollover in follow cluster, this should fail, because is_write_index property of an alias isn't
 | 
	
		
			
				|  |  | +            // replicated to follow cluster.
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                var rolloverRequest1 = new Request("POST", "/" + aliasName + "/_rollover");
 | 
	
		
			
				|  |  | +                var e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1));
 | 
	
		
			
				|  |  | +                assertThat(e.getMessage(), containsString("rollover target [" + aliasName + "] does not point to a write index"));
 | 
	
		
			
				|  |  | +                verifyAlias(client(), aliasName, false, aliasName + "-000002", aliasName + "-000001");
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        } finally {
 | 
	
		
			
				|  |  | +            cleanUpFollower(List.of(aliasName + "-000001", aliasName + "-000002"), List.of(), List.of("test_pattern"));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -529,32 +566,39 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |          int initialNumberOfSuccessfulFollowedIndicesInFollowCluster = getNumberOfSuccessfulFollowedIndices();
 | 
	
		
			
				|  |  |          int initialNumberOfSuccessfulFollowedIndicesInLeaderCluster;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // Create auto follow pattern in follow cluster
 | 
	
		
			
				|  |  | -        createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster");
 | 
	
		
			
				|  |  | -        // Create auto follow pattern in leader cluster:
 | 
	
		
			
				|  |  | -        try (var leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | -            initialNumberOfSuccessfulFollowedIndicesInLeaderCluster = getNumberOfSuccessfulFollowedIndices(leaderClient);
 | 
	
		
			
				|  |  | -            // First add remote cluster to leader cluster:
 | 
	
		
			
				|  |  | -            var request = new Request("PUT", "/_cluster/settings");
 | 
	
		
			
				|  |  | -            try (var bodyBuilder = JsonXContent.contentBuilder()) {
 | 
	
		
			
				|  |  | -                bodyBuilder.startObject();
 | 
	
		
			
				|  |  | -                {
 | 
	
		
			
				|  |  | -                    bodyBuilder.startObject("persistent");
 | 
	
		
			
				|  |  | +        var leaderDataStreamName = "logs-http-eu";
 | 
	
		
			
				|  |  | +        var followerDataStreamName = "logs-http-na";
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        try {
 | 
	
		
			
				|  |  | +            // Create auto follow pattern in follow cluster
 | 
	
		
			
				|  |  | +            createAutoFollowPattern(client(), "id1", "logs-*-eu", "leader_cluster");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // Create auto follow pattern in leader cluster:
 | 
	
		
			
				|  |  | +            try (var leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | +                initialNumberOfSuccessfulFollowedIndicesInLeaderCluster = getNumberOfSuccessfulFollowedIndices(leaderClient);
 | 
	
		
			
				|  |  | +                // First add remote cluster to leader cluster:
 | 
	
		
			
				|  |  | +                var request = new Request("PUT", "/_cluster/settings");
 | 
	
		
			
				|  |  | +                try (var bodyBuilder = JsonXContent.contentBuilder()) {
 | 
	
		
			
				|  |  | +                    bodyBuilder.startObject();
 | 
	
		
			
				|  |  |                      {
 | 
	
		
			
				|  |  | -                        bodyBuilder.startObject("cluster");
 | 
	
		
			
				|  |  | +                        bodyBuilder.startObject("persistent");
 | 
	
		
			
				|  |  |                          {
 | 
	
		
			
				|  |  | -                            bodyBuilder.startObject("remote");
 | 
	
		
			
				|  |  | +                            bodyBuilder.startObject("cluster");
 | 
	
		
			
				|  |  |                              {
 | 
	
		
			
				|  |  | -                                bodyBuilder.startObject("follower_cluster");
 | 
	
		
			
				|  |  | +                                bodyBuilder.startObject("remote");
 | 
	
		
			
				|  |  |                                  {
 | 
	
		
			
				|  |  | -                                    bodyBuilder.startArray("seeds");
 | 
	
		
			
				|  |  | -                                    var nodesInfoRequest = new Request("GET", "/_nodes/_local");
 | 
	
		
			
				|  |  | -                                    var nodesInfoResponse = toMap(client().performRequest(nodesInfoRequest));
 | 
	
		
			
				|  |  | -                                    var node = (Map<?, ?>) ((Map<?, ?>) nodesInfoResponse.get("nodes")).values().iterator().next();
 | 
	
		
			
				|  |  | -                                    var transportMetrics = (Map<?, ?>) node.get("transport");
 | 
	
		
			
				|  |  | -                                    var address = (String) transportMetrics.get("publish_address");
 | 
	
		
			
				|  |  | -                                    bodyBuilder.value(address);
 | 
	
		
			
				|  |  | -                                    bodyBuilder.endArray();
 | 
	
		
			
				|  |  | +                                    bodyBuilder.startObject("follower_cluster");
 | 
	
		
			
				|  |  | +                                    {
 | 
	
		
			
				|  |  | +                                        bodyBuilder.startArray("seeds");
 | 
	
		
			
				|  |  | +                                        var nodesInfoRequest = new Request("GET", "/_nodes/_local");
 | 
	
		
			
				|  |  | +                                        var nodesInfoResponse = toMap(client().performRequest(nodesInfoRequest));
 | 
	
		
			
				|  |  | +                                        var node = (Map<?, ?>) ((Map<?, ?>) nodesInfoResponse.get("nodes")).values().iterator().next();
 | 
	
		
			
				|  |  | +                                        var transportMetrics = (Map<?, ?>) node.get("transport");
 | 
	
		
			
				|  |  | +                                        var address = (String) transportMetrics.get("publish_address");
 | 
	
		
			
				|  |  | +                                        bodyBuilder.value(address);
 | 
	
		
			
				|  |  | +                                        bodyBuilder.endArray();
 | 
	
		
			
				|  |  | +                                    }
 | 
	
		
			
				|  |  | +                                    bodyBuilder.endObject();
 | 
	
		
			
				|  |  |                                  }
 | 
	
		
			
				|  |  |                                  bodyBuilder.endObject();
 | 
	
		
			
				|  |  |                              }
 | 
	
	
		
			
				|  | @@ -563,19 +607,15 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                          bodyBuilder.endObject();
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  |                      bodyBuilder.endObject();
 | 
	
		
			
				|  |  | +                    request.setJsonEntity(Strings.toString(bodyBuilder));
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | -                bodyBuilder.endObject();
 | 
	
		
			
				|  |  | -                request.setJsonEntity(Strings.toString(bodyBuilder));
 | 
	
		
			
				|  |  | +                assertOK(leaderClient.performRequest(request));
 | 
	
		
			
				|  |  | +                // Then create the actual auto follow pattern:
 | 
	
		
			
				|  |  | +                createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster");
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -            assertOK(leaderClient.performRequest(request));
 | 
	
		
			
				|  |  | -            // Then create the actual auto follow pattern:
 | 
	
		
			
				|  |  | -            createAutoFollowPattern(leaderClient, "id2", "logs-*-na", "follower_cluster");
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        var numDocs = 128;
 | 
	
		
			
				|  |  | -        var leaderDataStreamName = "logs-http-eu";
 | 
	
		
			
				|  |  | -        // Create data stream in leader cluster and ensure it is followed in follow cluster
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | +            var numDocs = 128;
 | 
	
		
			
				|  |  | +            // Create data stream in leader cluster and ensure it is followed in follow cluster
 | 
	
		
			
				|  |  |              try (var leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  |                  for (int i = 0; i < numDocs; i++) {
 | 
	
		
			
				|  |  |                      Request indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc");
 | 
	
	
		
			
				|  | @@ -592,9 +632,6 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                  ensureYellow(leaderDataStreamName);
 | 
	
		
			
				|  |  |                  verifyDocuments(client(), leaderDataStreamName, numDocs);
 | 
	
		
			
				|  |  |              });
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        var followerDataStreamName = "logs-http-na";
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  |              for (int i = 0; i < numDocs; i++) {
 | 
	
		
			
				|  |  |                  var indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc");
 | 
	
		
			
				|  |  |                  indexRequest.addParameter("refresh", "true");
 | 
	
	
		
			
				|  | @@ -611,68 +648,69 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |                      verifyDocuments(leaderClient, followerDataStreamName, numDocs);
 | 
	
		
			
				|  |  |                  });
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // TODO: Replace these verifyDocuments(...) assertions with searches via 'logs-http' alias and
 | 
	
		
			
				|  |  | -        // writes via 'logs-http' alias (ensuring write goes to write data stream).
 | 
	
		
			
				|  |  | -        // Currently aliases can't refer to data streams, so we can't fully test the bi-direction replication scenario.
 | 
	
		
			
				|  |  | -        // See: https://github.com/elastic/elasticsearch/pull/64710#discussion_r537210322
 | 
	
		
			
				|  |  | +            // TODO: Replace these verifyDocuments(...) assertions with searches via 'logs-http' alias and
 | 
	
		
			
				|  |  | +            // writes via 'logs-http' alias (ensuring write goes to write data stream).
 | 
	
		
			
				|  |  | +            // Currently aliases can't refer to data streams, so we can't fully test the bi-direction replication scenario.
 | 
	
		
			
				|  |  | +            // See: https://github.com/elastic/elasticsearch/pull/64710#discussion_r537210322
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // See all eu and na logs in leader and follower cluster:
 | 
	
		
			
				|  |  | -        verifyDocuments(client(), "logs-http*", numDocs * 2);
 | 
	
		
			
				|  |  | -        try (var leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | -            verifyDocuments(leaderClient, "logs-http*", numDocs * 2);
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        int moreDocs = 48;
 | 
	
		
			
				|  |  | -        // Index more docs into leader cluster
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | +            // See all eu and na logs in leader and follower cluster:
 | 
	
		
			
				|  |  | +            verifyDocuments(client(), "logs-http*", numDocs * 2);
 | 
	
		
			
				|  |  |              try (var leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | +                verifyDocuments(leaderClient, "logs-http*", numDocs * 2);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            int moreDocs = 48;
 | 
	
		
			
				|  |  | +            // Index more docs into leader cluster
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                try (var leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | +                    for (int i = 0; i < moreDocs; i++) {
 | 
	
		
			
				|  |  | +                        var indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc");
 | 
	
		
			
				|  |  | +                        indexRequest.addParameter("refresh", "true");
 | 
	
		
			
				|  |  | +                        indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
 | 
	
		
			
				|  |  | +                        assertOK(leaderClient.performRequest(indexRequest));
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                    verifyDocuments(leaderClient, leaderDataStreamName, numDocs + moreDocs);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                assertBusy(() -> {
 | 
	
		
			
				|  |  | +                    verifyDocuments(client(), leaderDataStreamName, numDocs + moreDocs);
 | 
	
		
			
				|  |  | +                });
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            // Index more docs into follower cluster
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  |                  for (int i = 0; i < moreDocs; i++) {
 | 
	
		
			
				|  |  | -                    var indexRequest = new Request("POST", "/" + leaderDataStreamName + "/_doc");
 | 
	
		
			
				|  |  | +                    var indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc");
 | 
	
		
			
				|  |  |                      indexRequest.addParameter("refresh", "true");
 | 
	
		
			
				|  |  |                      indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
 | 
	
		
			
				|  |  | -                    assertOK(leaderClient.performRequest(indexRequest));
 | 
	
		
			
				|  |  | +                    assertOK(client().performRequest(indexRequest));
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                verifyDocuments(client(), followerDataStreamName, numDocs + moreDocs);
 | 
	
		
			
				|  |  | +                try (var leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | +                    assertBusy(() -> {
 | 
	
		
			
				|  |  | +                        verifyDocuments(leaderClient, followerDataStreamName, numDocs + moreDocs);
 | 
	
		
			
				|  |  | +                    });
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | -                verifyDocuments(leaderClient, leaderDataStreamName, numDocs + moreDocs);
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | -            assertBusy(() -> {
 | 
	
		
			
				|  |  | -                verifyDocuments(client(), leaderDataStreamName, numDocs + moreDocs);
 | 
	
		
			
				|  |  | -            });
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -        // Index more docs into follower cluster
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | -            for (int i = 0; i < moreDocs; i++) {
 | 
	
		
			
				|  |  | -                var indexRequest = new Request("POST", "/" + followerDataStreamName + "/_doc");
 | 
	
		
			
				|  |  | -                indexRequest.addParameter("refresh", "true");
 | 
	
		
			
				|  |  | -                indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}");
 | 
	
		
			
				|  |  | -                assertOK(client().performRequest(indexRequest));
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | -            verifyDocuments(client(), followerDataStreamName, numDocs + moreDocs);
 | 
	
		
			
				|  |  | -            try (var leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | -                assertBusy(() -> {
 | 
	
		
			
				|  |  | -                    verifyDocuments(leaderClient, followerDataStreamName, numDocs + moreDocs);
 | 
	
		
			
				|  |  | -                });
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        // TODO: Replace these verifyDocuments(...) assertions with searches via 'logs-http' alias and writes via 'logs-http'
 | 
	
		
			
				|  |  | -        // (see previous TODO)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // See all eu and na logs in leader and follower cluster:
 | 
	
		
			
				|  |  | -        verifyDocuments(client(), "logs-http*", (numDocs + moreDocs) * 2);
 | 
	
		
			
				|  |  | -        try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | -            verifyDocuments(leaderClient, "logs-http*", (numDocs + moreDocs) * 2);
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | +            // TODO: Replace these verifyDocuments(...) assertions with searches via 'logs-http' alias and writes via 'logs-http'
 | 
	
		
			
				|  |  | +            // (see previous TODO)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // Cleanup:
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | -            deleteAutoFollowPattern(client(), "id1");
 | 
	
		
			
				|  |  | -            deleteDataStream(client(), followerDataStreamName);
 | 
	
		
			
				|  |  | +            // See all eu and na logs in leader and follower cluster:
 | 
	
		
			
				|  |  | +            verifyDocuments(client(), "logs-http*", (numDocs + moreDocs) * 2);
 | 
	
		
			
				|  |  |              try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | -                deleteDataStream(leaderClient, leaderDataStreamName);
 | 
	
		
			
				|  |  | -                deleteAutoFollowPattern(leaderClient, "id2");
 | 
	
		
			
				|  |  | +                verifyDocuments(leaderClient, "logs-http*", (numDocs + moreDocs) * 2);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | +        } finally {
 | 
	
		
			
				|  |  | +            cleanUpFollower(
 | 
	
		
			
				|  |  | +                List.of(backingIndexName(followerDataStreamName, 1), backingIndexName(leaderDataStreamName, 1)),
 | 
	
		
			
				|  |  | +                List.of(followerDataStreamName, leaderDataStreamName),
 | 
	
		
			
				|  |  | +                List.of("id1")
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            cleanUpLeader(
 | 
	
		
			
				|  |  | +                List.of(backingIndexName(leaderDataStreamName, 1), backingIndexName(followerDataStreamName, 1)),
 | 
	
		
			
				|  |  | +                List.of(leaderDataStreamName, followerDataStreamName),
 | 
	
		
			
				|  |  | +                List.of("id2")
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -762,18 +800,11 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |          return (Integer) response.get("number_of_successful_follow_indices");
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    private void deleteDataStream(String name) throws IOException {
 | 
	
		
			
				|  |  | -        try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | -            Request deleteTemplateRequest = new Request("DELETE", "/_data_stream/" + name);
 | 
	
		
			
				|  |  | -            assertOK(leaderClient.performRequest(deleteTemplateRequest));
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |      private void deleteDataStream(RestClient client, String name) throws IOException {
 | 
	
		
			
				|  |  |          Request deleteTemplateRequest = new Request("DELETE", "/_data_stream/" + name);
 | 
	
		
			
				|  |  |          assertOK(client.performRequest(deleteTemplateRequest));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -  
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      private Response getAutoFollowStats() throws IOException {
 | 
	
		
			
				|  |  |          final Request statsRequest = new Request("GET", "/_ccr/stats");
 | 
	
		
			
				|  |  |          statsRequest.addParameter("pretty", Boolean.TRUE.toString());
 | 
	
	
		
			
				|  | @@ -794,7 +825,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |              throw ae;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -  
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      @Override
 | 
	
		
			
				|  |  |      protected Settings restClientSettings() {
 | 
	
		
			
				|  |  |          String token = basicAuthHeaderValue("admin", new SecureString("admin-password".toCharArray()));
 | 
	
	
		
			
				|  | @@ -802,4 +833,61 @@ public class AutoFollowIT extends ESCCRRestTestCase {
 | 
	
		
			
				|  |  |              .put(ThreadContext.PREFIX + ".Authorization", token)
 | 
	
		
			
				|  |  |              .build();
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private void cleanUpFollower(
 | 
	
		
			
				|  |  | +        final List<String> indices,
 | 
	
		
			
				|  |  | +        final List<String> dataStreams,
 | 
	
		
			
				|  |  | +        final List<String> autoFollowPatterns
 | 
	
		
			
				|  |  | +    ) {
 | 
	
		
			
				|  |  | +        cleanUp(adminClient(), indices, dataStreams, autoFollowPatterns);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private void cleanUpMiddle(
 | 
	
		
			
				|  |  | +        final List<String> indices,
 | 
	
		
			
				|  |  | +        final List<String> dataStreams,
 | 
	
		
			
				|  |  | +        final List<String> autoFollowPatterns
 | 
	
		
			
				|  |  | +    ) throws IOException {
 | 
	
		
			
				|  |  | +        try (RestClient middleClient = buildMiddleClient()) {
 | 
	
		
			
				|  |  | +            cleanUp(middleClient, indices, dataStreams, autoFollowPatterns);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private void cleanUpLeader(
 | 
	
		
			
				|  |  | +        final List<String> indices,
 | 
	
		
			
				|  |  | +        final List<String> dataStreams,
 | 
	
		
			
				|  |  | +        final List<String> autoFollowPatterns
 | 
	
		
			
				|  |  | +    ) throws IOException {
 | 
	
		
			
				|  |  | +        try (RestClient leaderClient = buildLeaderClient()) {
 | 
	
		
			
				|  |  | +            cleanUp(leaderClient, indices, dataStreams, autoFollowPatterns);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private void cleanUp(
 | 
	
		
			
				|  |  | +        final RestClient client,
 | 
	
		
			
				|  |  | +        final List<String> indices,
 | 
	
		
			
				|  |  | +        final List<String> dataStreams,
 | 
	
		
			
				|  |  | +        final List<String> autoFollowPatterns
 | 
	
		
			
				|  |  | +    ) {
 | 
	
		
			
				|  |  | +        for (String autoFollowPattern : autoFollowPatterns) {
 | 
	
		
			
				|  |  | +            try {
 | 
	
		
			
				|  |  | +                deleteAutoFollowPattern(client, autoFollowPattern);
 | 
	
		
			
				|  |  | +            } catch (IOException e) {
 | 
	
		
			
				|  |  | +                logger.warn(() -> new ParameterizedMessage("failed to delete auto-follow pattern [{}] after test", autoFollowPattern), e);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        for (String dataStream : dataStreams) {
 | 
	
		
			
				|  |  | +            try {
 | 
	
		
			
				|  |  | +                deleteDataStream(client, dataStream);
 | 
	
		
			
				|  |  | +            } catch (IOException e) {
 | 
	
		
			
				|  |  | +                logger.warn(() -> new ParameterizedMessage("failed to delete data stream [{}] after test", dataStream), e);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        for (String index : indices) {
 | 
	
		
			
				|  |  | +            try {
 | 
	
		
			
				|  |  | +                deleteIndex(client, index);
 | 
	
		
			
				|  |  | +            } catch (IOException e) {
 | 
	
		
			
				|  |  | +                logger.warn(() -> new ParameterizedMessage("failed to delete index [{}] after test", index), e);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  }
 |