浏览代码

[TSDB] Add validations for the downsampling ILM action (#90295)

Add validations on downsampling intervals when there are more than phases with downsampling actions (rollups of rollups case).

The rules that we enforce are the following:

    The latter interval must be greater than the previous interval
    The latter interval must be a multiple of the previous interval
Christos Soulios 3 年之前
父节点
当前提交
eb0856393c

+ 5 - 0
docs/changelog/90295.yaml

@@ -0,0 +1,5 @@
+pr: 90295
+summary: "Add validations for the downsampling ILM action"
+area: "ILM+SLM"
+type: enhancement
+issues: []

+ 63 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java

@@ -10,6 +10,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -25,6 +26,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -322,6 +324,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
         validateActionsFollowingSearchableSnapshot(phases);
         validateAllSearchableSnapshotActionsUseSameRepository(phases);
         validateFrozenPhaseHasSearchableSnapshotAction(phases);
+        validateDownsamplingIntervals(phases);
     }
 
     static void validateActionsFollowingSearchableSnapshot(Collection<Phase> phases) {
@@ -489,6 +492,66 @@ public class TimeseriesLifecycleType implements LifecycleType {
         });
     }
 
+    /**
+     * Add validations if there are multiple downsample actions on different phases. The rules that we
+     * enforce are the following:
+     *   - The latter interval must be greater than the previous interval
+     *   - The latter interval must be a multiple of the previous interval
+     */
+    static void validateDownsamplingIntervals(Collection<Phase> phases) {
+        Map<String, Phase> phasesWithDownsamplingActions = phases.stream()
+            .filter(phase -> phase.getActions().containsKey(DownsampleAction.NAME))
+            .collect(Collectors.toMap(Phase::getName, Function.identity()));
+
+        if (phasesWithDownsamplingActions.size() < 2) {
+            // Interval validations must be executed when there are at least two downsample actions, otherwise return
+            return;
+        }
+
+        // Order phases and extract the downsample action instances per phase
+        List<Phase> orderedPhases = INSTANCE.getOrderedPhases(phasesWithDownsamplingActions);
+        var downsampleActions = orderedPhases.stream()
+            .map(phase -> Tuple.tuple(phase.getName(), (DownsampleAction) phase.getActions().get(DownsampleAction.NAME)))
+            .toList(); // Returns a list of tuples (phase name, downsample action)
+
+        var firstDownsample = downsampleActions.get(0);
+        for (int i = 1; i < downsampleActions.size(); i++) {
+            var secondDownsample = downsampleActions.get(i);
+            var firstInterval = firstDownsample.v2().fixedInterval();
+            var secondInterval = secondDownsample.v2().fixedInterval();
+            long firstMillis = firstInterval.estimateMillis();
+            long secondMillis = secondInterval.estimateMillis();
+            if (firstMillis >= secondMillis) {
+                // The later interval must be greater than the previous interval
+                throw new IllegalArgumentException(
+                    "Downsampling interval ["
+                        + secondInterval
+                        + "] for phase ["
+                        + secondDownsample.v1()
+                        + "] must be greater than the interval ["
+                        + firstInterval
+                        + "] for phase ["
+                        + firstDownsample.v1()
+                        + "]"
+                );
+            } else if (secondMillis % firstMillis != 0) {
+                // Downsampling interval must be a multiple of the source interval
+                throw new IllegalArgumentException(
+                    "Downsampling interval ["
+                        + secondInterval
+                        + "] for phase ["
+                        + secondDownsample.v1()
+                        + "] must be a multiple of the interval ["
+                        + firstInterval
+                        + "] for phase ["
+                        + firstDownsample.v1()
+                        + "]"
+                );
+            }
+            firstDownsample = secondDownsample;
+        }
+    }
+
     private static boolean definesAllocationRules(AllocateAction action) {
         return action.getRequire().isEmpty() == false || action.getInclude().isEmpty() == false || action.getExclude().isEmpty() == false;
     }

+ 95 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java

@@ -360,6 +360,101 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
         }
     }
 
+    public void testValidateDownsamplingAction() {
+        {
+            Phase hotPhase = new Phase("hot", TimeValue.ZERO, Map.of(RolloverAction.NAME, TEST_ROLLOVER_ACTION));
+            Phase warmPhase = new Phase(
+                "warm",
+                TimeValue.ZERO,
+                Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(1)))
+            );
+            Phase coldPhase = new Phase(
+                "cold",
+                TimeValue.ZERO,
+                Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(1)))
+            );
+
+            IllegalArgumentException e = expectThrows(
+                IllegalArgumentException.class,
+                () -> TimeseriesLifecycleType.validateDownsamplingIntervals(List.of(warmPhase, coldPhase, hotPhase))
+            );
+            assertThat(
+                e.getMessage(),
+                is("Downsampling interval [1h] for phase [cold] must be greater than the interval [1h] for phase [warm]")
+            );
+        }
+
+        {
+            Phase warmPhase = new Phase(
+                "warm",
+                TimeValue.ZERO,
+                Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(1)))
+            );
+            Phase coldPhase = new Phase(
+                "cold",
+                TimeValue.ZERO,
+                Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.minutes(30)))
+            );
+
+            IllegalArgumentException e = expectThrows(
+                IllegalArgumentException.class,
+                () -> TimeseriesLifecycleType.validateDownsamplingIntervals(List.of(coldPhase, warmPhase))
+            );
+            assertThat(
+                e.getMessage(),
+                is("Downsampling interval [30m] for phase [cold] must be greater than the interval [1h] for phase [warm]")
+            );
+        }
+
+        {
+            Phase warmPhase = new Phase(
+                "warm",
+                TimeValue.ZERO,
+                Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(1)))
+            );
+            Phase coldPhase = new Phase(
+                "cold",
+                TimeValue.ZERO,
+                Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.minutes(130)))
+            );
+
+            IllegalArgumentException e = expectThrows(
+                IllegalArgumentException.class,
+                () -> TimeseriesLifecycleType.validateDownsamplingIntervals(List.of(coldPhase, warmPhase))
+            );
+            assertThat(
+                e.getMessage(),
+                is("Downsampling interval [130m] for phase [cold] must be a multiple of the interval [1h] for phase [warm]")
+            );
+        }
+
+        {
+            Phase hotPhase = new Phase(
+                "hot",
+                TimeValue.ZERO,
+                Map.of(
+                    RolloverAction.NAME,
+                    TEST_ROLLOVER_ACTION,
+                    DownsampleAction.NAME,
+                    new DownsampleAction(DateHistogramInterval.minutes(10))
+                )
+            );
+            Phase warmPhase = new Phase(
+                "warm",
+                TimeValue.ZERO,
+                Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.minutes(30)))
+            );
+            Phase coldPhase = new Phase(
+                "cold",
+                TimeValue.ZERO,
+                Map.of(DownsampleAction.NAME, new DownsampleAction(DateHistogramInterval.hours(2)))
+            );
+
+            // This is a valid interval combination
+            TimeseriesLifecycleType.validateDownsamplingIntervals(List.of(coldPhase, warmPhase, hotPhase));
+        }
+    }
+
     public void testGetOrderedPhases() {
         Map<String, Phase> phaseMap = new HashMap<>();
         for (String phaseName : randomSubsetOf(randomIntBetween(0, ORDERED_VALID_PHASES.size()), ORDERED_VALID_PHASES)) {

+ 64 - 0
x-pack/plugin/ilm/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/ilm/70_downsampling.yml

@@ -0,0 +1,64 @@
+setup:
+  - skip:
+      version: " - 8.4.99"
+      reason: "Downsample ILM validations added in 8.5.0"
+
+  - do:
+      cluster.health:
+          wait_for_status: yellow
+
+---
+"Test downsample in hot phase without rollover":
+  - skip:
+      version: " - 8.4.99"
+      reason: "Downsample ILM validations added in 8.5.0"
+
+  - do:
+      catch: /the \[downsample\] action\(s\) may not be used in the \[hot\] phase without an accompanying \[rollover\] action/
+      ilm.put_lifecycle:
+        policy: "bad_policy"
+        body: |
+          {
+            "policy": {
+              "phases": {
+                "hot": {
+                  "min_age": "0s",
+                  "actions": {
+                    "downsample": {
+                      "fixed_interval": "3h"
+                    }
+                  }
+                }
+              }
+            }
+          }
+
+---
+"Test downsampling in multiple phases with the same interval":
+  - do:
+      catch: /Downsampling interval \[3h\] for phase \[cold\] must be greater than the interval \[3h\] for phase \[warm\]/
+      ilm.put_lifecycle:
+        policy: "bad_policy"
+        body: |
+          {
+            "policy": {
+              "phases": {
+                "warm": {
+                  "min_age": "10s",
+                  "actions": {
+                    "downsample": {
+                      "fixed_interval": "3h"
+                    }
+                  }
+                },
+                "cold": {
+                  "min_age": "30s",
+                  "actions": {
+                    "downsample": {
+                      "fixed_interval": "3h"
+                    }
+                  }
+                }
+              }
+            }
+          }

+ 2 - 2
x-pack/plugin/rollup/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/rollup/10_basic.yml

@@ -383,7 +383,7 @@ setup:
   - is_true: acknowledged
 
   - do:
-      catch: /Downsampling interval \[1h\] must be greater than the the source index interval \[1h\]/
+      catch: /Downsampling interval \[1h\] must be greater than the source index interval \[1h\]/
       indices.downsample:
         index: rollup-test
         target_index: rollup-test-2
@@ -393,7 +393,7 @@ setup:
           }
 
   - do:
-      catch: /Downsampling interval \[30m\] must be greater than the the source index interval \[1h\]/
+      catch: /Downsampling interval \[30m\] must be greater than the source index interval \[1h\]/
       indices.downsample:
         index: rollup-test
         target_index: rollup-test-2

+ 4 - 4
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportRollupAction.java

@@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.admin.cluster.stats.MappingVisitor;
 import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -65,7 +66,6 @@ import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.downsample.DownsampleAction;
 import org.elasticsearch.xpack.core.downsample.DownsampleConfig;
 import org.elasticsearch.xpack.core.downsample.RollupIndexerAction;
-import org.elasticsearch.xpack.core.rollup.action.RollupActionRequestValidationException;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -231,7 +231,7 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
                 }
             });
 
-            RollupActionRequestValidationException validationException = new RollupActionRequestValidationException();
+            ActionRequestValidationException validationException = new ActionRequestValidationException();
             if (dimensionFields.isEmpty()) {
                 validationException.addValidationError("Index [" + sourceIndexName + "] does not contain any dimension fields");
             }
@@ -480,7 +480,7 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
     private static void validateDownsamplingInterval(MapperService mapperService, DownsampleConfig config) {
         MappedFieldType timestampFieldType = mapperService.fieldType(config.getTimestampField());
         assert timestampFieldType != null : "Cannot find timestamp field [" + config.getTimestampField() + "] in the mapping";
-        RollupActionRequestValidationException e = new RollupActionRequestValidationException();
+        ActionRequestValidationException e = new ActionRequestValidationException();
 
         Map<String, String> meta = timestampFieldType.meta();
         if (meta.isEmpty() == false) {
@@ -495,7 +495,7 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
                     e.addValidationError(
                         "Source index is a downsampled index. Downsampling interval ["
                             + targetIndexInterval
-                            + "] must be greater than the the source index interval ["
+                            + "] must be greater than the source index interval ["
                             + sourceIndexInterval
                             + "]"
                     );

+ 1 - 2
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java

@@ -75,7 +75,6 @@ import org.elasticsearch.xpack.core.downsample.DownsampleConfig;
 import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
 import org.elasticsearch.xpack.core.ilm.RolloverAction;
 import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
-import org.elasticsearch.xpack.core.rollup.action.RollupActionRequestValidationException;
 import org.elasticsearch.xpack.ilm.IndexLifecycle;
 import org.elasticsearch.xpack.rollup.Rollup;
 import org.junit.Before;
@@ -478,7 +477,7 @@ public class DownsampleActionSingleNodeTests extends ESSingleNodeTestCase {
 
         DownsampleConfig config = new DownsampleConfig(randomInterval());
         prepareSourceIndex(sourceIndex);
-        Exception exception = expectThrows(RollupActionRequestValidationException.class, () -> rollup(sourceIndex, rollupIndex, config));
+        Exception exception = expectThrows(ActionRequestValidationException.class, () -> rollup(sourceIndex, rollupIndex, config));
         assertThat(exception.getMessage(), containsString("does not contain any metric fields"));
     }