Browse Source

[8.19] [ILM]: Fix TSDS unfollow timing with WaitUntilTimeSeriesEndTimePassesStep (#128361) (#129518)

* [ILM]: Fix TSDS unfollow timing with WaitUntilTimeSeriesEndTimePassesStep (#128361)

The backing indices of a time series data streams (TSDS) have time ranges (start_time & end_time) and they include documents that belong to these time ranges.

To ensure that we will not unfollow a leader TSDS index before the indexing is complete, we need to add a WaitUntilTimeSeriesEndTimePassesStep to the unfollow action. This will ensure that we will only unfollow after the end_time has passed.

This creates some weird semantics with the combination of the rollover and the unfollow. Because we need the rollover of the leader index to finalise the end_time but the unfollow action is injected before the rollover. However, this should be fine, because the leader index will skip the unfollow action so it will rollover and finalise the end_time and the follower index will wait the end_time to pass before it unfollows. Rolling over the follower index will have no effect since it’s already rolled over.

(cherry picked from commit ed7f2ca8df572020f8453f0d34899701190ad300)

# Conflicts:
#	x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java

* Fix missing method

* [CI] Auto commit changes from spotless

---------

Co-authored-by: 안수빈 <76802855+happysubin@users.noreply.github.com>
Co-authored-by: elasticsearchmachine <infra-root+elasticsearchmachine@elastic.co>
Sam Xiao 3 months ago
parent
commit
5b2c31733d

+ 6 - 0
docs/changelog/128361.yaml

@@ -0,0 +1,6 @@
+pr: 128361
+summary: The follower index should wait until the time series end time passes before unfollowing the leader index.
+area: ILM+SLM
+type: bug
+issues:
+ - 128129

+ 9 - 2
test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

@@ -2116,9 +2116,16 @@ public abstract class ESRestTestCase extends ESTestCase {
     /**
      * Returns a list of the data stream's backing index names.
      */
-    @SuppressWarnings("unchecked")
     protected static List<String> getDataStreamBackingIndexNames(String dataStreamName) throws IOException {
-        Map<String, Object> response = getAsMap(client(), "/_data_stream/" + dataStreamName);
+        return getDataStreamBackingIndexNames(client(), dataStreamName);
+    }
+
+    /**
+     * Returns a list of the data stream's backing index names.
+     */
+    @SuppressWarnings("unchecked")
+    protected static List<String> getDataStreamBackingIndexNames(RestClient client, String dataStreamName) throws IOException {
+        Map<String, Object> response = getAsMap(client, "/_data_stream/" + dataStreamName);
         List<?> dataStreams = (List<?>) response.get("data_streams");
         assertThat(dataStreams.size(), equalTo(1));
         Map<?, ?> dataStream = (Map<?, ?>) dataStreams.get(0);

+ 15 - 8
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UnfollowAction.java

@@ -17,6 +17,7 @@ import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 
@@ -44,6 +45,7 @@ public final class UnfollowAction implements LifecycleAction {
     public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
         StepKey preUnfollowKey = new StepKey(phase, NAME, CONDITIONAL_UNFOLLOW_STEP);
         StepKey indexingComplete = new StepKey(phase, NAME, WaitForIndexingCompleteStep.NAME);
+        StepKey waitUntilTimeSeriesEndTimePassesStep = new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME);
         StepKey waitForFollowShardTasks = new StepKey(phase, NAME, WaitForFollowShardTasksStep.NAME);
         StepKey pauseFollowerIndex = new StepKey(phase, NAME, PauseFollowerIndexStep.NAME);
         StepKey closeFollowerIndex = new StepKey(phase, NAME, CloseFollowerIndexStep.NAME);
@@ -64,14 +66,19 @@ public final class UnfollowAction implements LifecycleAction {
                 return customIndexMetadata == null;
             }
         );
-        WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitForFollowShardTasks);
-        WaitForFollowShardTasksStep step2 = new WaitForFollowShardTasksStep(waitForFollowShardTasks, pauseFollowerIndex, client);
-        PauseFollowerIndexStep step3 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client);
-        CloseFollowerIndexStep step4 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client);
-        UnfollowFollowerIndexStep step5 = new UnfollowFollowerIndexStep(unfollowFollowerIndex, openFollowerIndex, client);
-        OpenIndexStep step6 = new OpenIndexStep(openFollowerIndex, waitForYellowStep, client);
-        WaitForIndexColorStep step7 = new WaitForIndexColorStep(waitForYellowStep, nextStepKey, ClusterHealthStatus.YELLOW);
-        return List.of(conditionalSkipUnfollowStep, step1, step2, step3, step4, step5, step6, step7);
+        WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitUntilTimeSeriesEndTimePassesStep);
+        WaitUntilTimeSeriesEndTimePassesStep step2 = new WaitUntilTimeSeriesEndTimePassesStep(
+            waitUntilTimeSeriesEndTimePassesStep,
+            waitForFollowShardTasks,
+            Instant::now
+        );
+        WaitForFollowShardTasksStep step3 = new WaitForFollowShardTasksStep(waitForFollowShardTasks, pauseFollowerIndex, client);
+        PauseFollowerIndexStep step4 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client);
+        CloseFollowerIndexStep step5 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client);
+        UnfollowFollowerIndexStep step6 = new UnfollowFollowerIndexStep(unfollowFollowerIndex, openFollowerIndex, client);
+        OpenIndexStep step7 = new OpenIndexStep(openFollowerIndex, waitForYellowStep, client);
+        WaitForIndexColorStep step8 = new WaitForIndexColorStep(waitForYellowStep, nextStepKey, ClusterHealthStatus.YELLOW);
+        return List.of(conditionalSkipUnfollowStep, step1, step2, step3, step4, step5, step6, step7, step8);
     }
 
     @Override

+ 20 - 15
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UnfollowActionTests.java

@@ -51,16 +51,17 @@ public class UnfollowActionTests extends AbstractActionTestCase<UnfollowAction>
         );
         List<Step> steps = action.toSteps(null, phase, nextStepKey);
         assertThat(steps, notNullValue());
-        assertThat(steps.size(), equalTo(8));
+        assertThat(steps.size(), equalTo(9));
 
         StepKey expectedFirstStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowAction.CONDITIONAL_UNFOLLOW_STEP);
         StepKey expectedSecondStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexingCompleteStep.NAME);
-        StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME);
-        StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME);
-        StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME);
-        StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowerIndexStep.NAME);
-        StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, OPEN_FOLLOWER_INDEX_STEP_NAME);
-        StepKey expectedEighthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexColorStep.NAME);
+        StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME);
+        StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME);
+        StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME);
+        StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME);
+        StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowerIndexStep.NAME);
+        StepKey expectedEighthStepKey = new StepKey(phase, UnfollowAction.NAME, OPEN_FOLLOWER_INDEX_STEP_NAME);
+        StepKey expectedNinthStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexColorStep.NAME);
 
         BranchingStep firstStep = (BranchingStep) steps.get(0);
         assertThat(firstStep.getKey(), equalTo(expectedFirstStepKey));
@@ -69,30 +70,34 @@ public class UnfollowActionTests extends AbstractActionTestCase<UnfollowAction>
         assertThat(secondStep.getKey(), equalTo(expectedSecondStepKey));
         assertThat(secondStep.getNextStepKey(), equalTo(expectedThirdStepKey));
 
-        WaitForFollowShardTasksStep thirdStep = (WaitForFollowShardTasksStep) steps.get(2);
+        WaitUntilTimeSeriesEndTimePassesStep thirdStep = (WaitUntilTimeSeriesEndTimePassesStep) steps.get(2);
         assertThat(thirdStep.getKey(), equalTo(expectedThirdStepKey));
         assertThat(thirdStep.getNextStepKey(), equalTo(expectedFourthStepKey));
 
-        PauseFollowerIndexStep fourthStep = (PauseFollowerIndexStep) steps.get(3);
+        WaitForFollowShardTasksStep fourthStep = (WaitForFollowShardTasksStep) steps.get(3);
         assertThat(fourthStep.getKey(), equalTo(expectedFourthStepKey));
         assertThat(fourthStep.getNextStepKey(), equalTo(expectedFifthStepKey));
 
-        CloseFollowerIndexStep fifthStep = (CloseFollowerIndexStep) steps.get(4);
+        PauseFollowerIndexStep fifthStep = (PauseFollowerIndexStep) steps.get(4);
         assertThat(fifthStep.getKey(), equalTo(expectedFifthStepKey));
         assertThat(fifthStep.getNextStepKey(), equalTo(expectedSixthStepKey));
 
-        UnfollowFollowerIndexStep sixthStep = (UnfollowFollowerIndexStep) steps.get(5);
+        CloseFollowerIndexStep sixthStep = (CloseFollowerIndexStep) steps.get(5);
         assertThat(sixthStep.getKey(), equalTo(expectedSixthStepKey));
         assertThat(sixthStep.getNextStepKey(), equalTo(expectedSeventhStepKey));
 
-        OpenIndexStep seventhStep = (OpenIndexStep) steps.get(6);
+        UnfollowFollowerIndexStep seventhStep = (UnfollowFollowerIndexStep) steps.get(6);
         assertThat(seventhStep.getKey(), equalTo(expectedSeventhStepKey));
         assertThat(seventhStep.getNextStepKey(), equalTo(expectedEighthStepKey));
 
-        WaitForIndexColorStep eighthStep = (WaitForIndexColorStep) steps.get(7);
-        assertThat(eighthStep.getColor(), is(ClusterHealthStatus.YELLOW));
+        OpenIndexStep eighthStep = (OpenIndexStep) steps.get(7);
         assertThat(eighthStep.getKey(), equalTo(expectedEighthStepKey));
-        assertThat(eighthStep.getNextStepKey(), equalTo(nextStepKey));
+        assertThat(eighthStep.getNextStepKey(), equalTo(expectedNinthStepKey));
+
+        WaitForIndexColorStep ninth = (WaitForIndexColorStep) steps.get(8);
+        assertThat(ninth.getColor(), is(ClusterHealthStatus.YELLOW));
+        assertThat(ninth.getKey(), equalTo(expectedNinthStepKey));
+        assertThat(ninth.getNextStepKey(), equalTo(nextStepKey));
     }
 
     @Override

+ 142 - 0
x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ilm/CCRIndexLifecycleIT.java

@@ -17,6 +17,8 @@ import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.time.DateFormatter;
+import org.elasticsearch.common.time.FormatNames;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.rest.RestStatus;
@@ -28,9 +30,11 @@ import org.elasticsearch.xpack.core.ilm.LifecycleAction;
 import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
 import org.elasticsearch.xpack.core.ilm.Phase;
 import org.elasticsearch.xpack.core.ilm.UnfollowAction;
+import org.elasticsearch.xpack.core.ilm.WaitUntilTimeSeriesEndTimePassesStep;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.time.Instant;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -39,7 +43,9 @@ import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.xpack.core.ilm.ShrinkIndexNameSupplier.SHRUNKEN_INDEX_PREFIX;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
@@ -47,6 +53,37 @@ import static org.hamcrest.Matchers.nullValue;
 public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
 
     private static final Logger LOGGER = LogManager.getLogger(CCRIndexLifecycleIT.class);
+    private static final String TSDB_INDEX_TEMPLATE = """
+        {
+            "index_patterns": ["%s*"],
+            "data_stream": {},
+            "template": {
+                "settings":{
+                    "index": {
+                        "number_of_replicas": 0,
+                        "number_of_shards": 1,
+                        "routing_path": ["metricset"],
+                        "mode": "time_series"
+                    },
+                    "index.lifecycle.name": "%s"
+                },
+                "mappings":{
+                    "properties": {
+                        "@timestamp" : {
+                            "type": "date"
+                        },
+                        "metricset": {
+                            "type": "keyword",
+                            "time_series_dimension": true
+                        },
+                        "volume": {
+                            "type": "double",
+                            "time_series_metric": "gauge"
+                        }
+                    }
+                }
+            }
+        }""";
 
     public void testBasicCCRAndILMIntegration() throws Exception {
         String indexName = "logs-1";
@@ -533,6 +570,91 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    public void testTsdbLeaderIndexRolloverAndSyncAfterWaitUntilEndTime() throws Exception {
+        String indexPattern = "tsdb-index-";
+        String dataStream = "tsdb-index-cpu";
+        String policyName = "tsdb-policy";
+
+        if ("leader".equals(targetCluster)) {
+            putILMPolicy(policyName, null, 1, null);
+            Request templateRequest = new Request("PUT", "/_index_template/tsdb_template");
+            templateRequest.setJsonEntity(Strings.format(TSDB_INDEX_TEMPLATE, indexPattern, policyName));
+            assertOK(client().performRequest(templateRequest));
+        } else if ("follow".equals(targetCluster)) {
+            // Use unfollow-only policy for follower cluster instead of regular ILM policy
+            // Follower clusters should not have their own rollover actions as they are meant
+            // to follow the rollover behavior of the leader index, not initiate their own rollovers
+            putUnfollowOnlyPolicy(client(), policyName);
+
+            Request createAutoFollowRequest = new Request("PUT", "/_ccr/auto_follow/tsdb_index_auto_follow_pattern");
+            createAutoFollowRequest.setJsonEntity("""
+                {
+                    "leader_index_patterns": [ "tsdb-index-*" ],
+                    "remote_cluster": "leader_cluster",
+                    "read_poll_timeout": "1000ms",
+                    "follow_index_pattern": "{{leader_index}}"
+                }""");
+            assertOK(client().performRequest(createAutoFollowRequest));
+
+            try (RestClient leaderClient = buildLeaderClient()) {
+                String now = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(Instant.now());
+
+                // Index a document on the leader index, this should trigger an ILM rollover.
+                // This will ensure that 'index.lifecycle.indexing_complete' is set.
+                index(leaderClient, dataStream, "", "@timestamp", now, "volume", 11.0, "metricset", randomAlphaOfLength(5));
+
+                String backingIndexName = getDataStreamBackingIndexNames(leaderClient, "tsdb-index-cpu").get(0);
+                assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + backingIndexName))));
+
+                assertBusy(() -> {
+                    Map<String, Object> indexExplanation = explainIndex(client(), backingIndexName);
+                    assertThat(
+                        "index must wait in the " + WaitUntilTimeSeriesEndTimePassesStep.NAME + " until its end time lapses",
+                        indexExplanation.get("step"),
+                        is(WaitUntilTimeSeriesEndTimePassesStep.NAME)
+                    );
+
+                    assertThat(indexExplanation.get("step_info"), is(notNullValue()));
+                    assertThat(
+                        (String) ((Map<String, Object>) indexExplanation.get("step_info")).get("message"),
+                        containsString("Waiting until the index's time series end time lapses")
+                    );
+                }, 30, TimeUnit.SECONDS);
+
+                int initialLeaderDocCount = getDocCount(leaderClient, backingIndexName);
+
+                // Add more documents to the leader index while it's in WaitUntilTimeSeriesEndTimePassesStep
+                String futureTimestamp = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName())
+                    .format(Instant.now().plusSeconds(30));
+
+                for (int i = 0; i < 5; i++) {
+                    index(leaderClient, dataStream, "", "@timestamp", futureTimestamp, "volume", 20.0 + i, "metricset", "test-sync-" + i);
+                }
+
+                // Verify that new documents are synced to follower while in WaitUntilTimeSeriesEndTimePassesStep
+                assertBusy(() -> {
+                    int currentLeaderDocCount = getDocCount(leaderClient, backingIndexName);
+                    int currentFollowerDocCount = getDocCount(client(), backingIndexName);
+
+                    assertThat(
+                        "Leader should have more documents than initially",
+                        currentLeaderDocCount,
+                        greaterThan(initialLeaderDocCount)
+                    );
+                    assertThat("Follower should sync new documents from leader", currentFollowerDocCount, equalTo(currentLeaderDocCount));
+
+                    // Also verify the step is still WaitUntilTimeSeriesEndTimePassesStep
+                    assertThat(
+                        "Index should still be in WaitUntilTimeSeriesEndTimePassesStep",
+                        explainIndex(client(), backingIndexName).get("step"),
+                        is(WaitUntilTimeSeriesEndTimePassesStep.NAME)
+                    );
+                }, 30, TimeUnit.SECONDS);
+            }
+        }
+    }
+
     private void configureRemoteClusters(String name, String leaderRemoteClusterSeed) throws IOException {
         logger.info("Configuring leader remote cluster [{}]", leaderRemoteClusterSeed);
         Request request = new Request("PUT", "/_cluster/settings");
@@ -839,4 +961,24 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
             : "lifecycle execution state must contain the target shrink index name for index [" + originalIndex + "]";
         return shrunkenIndexName[0];
     }
+
+    private static Map<String, Object> explainIndex(RestClient client, String indexName) throws IOException {
+        Request explainRequest = new Request("GET", indexName + "/_ilm/explain");
+        Response response = client.performRequest(explainRequest);
+        Map<String, Object> responseMap;
+        try (InputStream is = response.getEntity().getContent()) {
+            responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
+        }
+
+        @SuppressWarnings("unchecked")
+        Map<String, Map<String, Object>> indexResponse = ((Map<String, Map<String, Object>>) responseMap.get("indices"));
+        return indexResponse.get(indexName);
+    }
+
+    private static int getDocCount(RestClient client, String indexName) throws IOException {
+        Request countRequest = new Request("GET", "/" + indexName + "/_count");
+        Response response = client.performRequest(countRequest);
+        Map<String, Object> result = entityAsMap(response);
+        return (int) result.get("count");
+    }
 }