Sfoglia il codice sorgente

[ML][Data Frame] adds index validations to _start data frame transform (#44191)

* [ML][Data Frame] adds index validations to _start data frame transform

* addressing pr comments
Benjamin Trent 6 anni fa
parent
commit
c33b6b1070

+ 4 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java

@@ -64,6 +64,10 @@ public class PutDataFrameTransformAction extends ActionType<AcknowledgedResponse
             return new Request(DataFrameTransformConfig.fromXContent(parser, id, false));
         }
 
+        /**
+         * More complex validations with how {@link DataFrameTransformConfig#getDestination()} and
+         * {@link DataFrameTransformConfig#getSource()} relate are done in the transport handler.
+         */
         @Override
         public ActionRequestValidationException validate() {
             ActionRequestValidationException validationException = null;

+ 9 - 52
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java

@@ -23,12 +23,10 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
-import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -49,16 +47,14 @@ import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges
 import org.elasticsearch.xpack.core.security.support.Exceptions;
 import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
+import org.elasticsearch.xpack.dataframe.transforms.SourceDestValidator;
 import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
 
 import java.io.IOException;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 public class TransportPutDataFrameTransformAction
@@ -129,58 +125,19 @@ public class TransportPutDataFrameTransformAction
                     DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_TRANSFORM_EXISTS, transformId)));
             return;
         }
-        final String destIndex = config.getDestination().getIndex();
-        Set<String> concreteSourceIndexNames = new HashSet<>();
-        for(String src : config.getSource().getIndex()) {
-            String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src);
-            if (concreteNames.length == 0) {
-                listener.onFailure(new ElasticsearchStatusException(
-                    DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src),
-                    RestStatus.BAD_REQUEST));
-                return;
-            }
-            if (Regex.simpleMatch(src, destIndex)) {
-                listener.onFailure(new ElasticsearchStatusException(
-                    DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src),
-                    RestStatus.BAD_REQUEST
-                ));
-                return;
-            }
-            concreteSourceIndexNames.addAll(Arrays.asList(concreteNames));
-        }
-
-        if (concreteSourceIndexNames.contains(destIndex)) {
-            listener.onFailure(new ElasticsearchStatusException(
-                DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
-                    destIndex,
-                    Strings.arrayToCommaDelimitedString(config.getSource().getIndex())),
-                RestStatus.BAD_REQUEST
-            ));
-            return;
-        }
-
-        final String[] concreteDest =
-            indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex);
-
-        if (concreteDest.length > 1) {
-            listener.onFailure(new ElasticsearchStatusException(
-                DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex),
-                RestStatus.BAD_REQUEST
-            ));
-            return;
-        }
-        if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) {
-            listener.onFailure(new ElasticsearchStatusException(
-                DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
-                    concreteDest[0],
-                    Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))),
-                RestStatus.BAD_REQUEST
-            ));
+        try {
+            SourceDestValidator.check(config, clusterState, indexNameExpressionResolver);
+        } catch (ElasticsearchStatusException ex) {
+            listener.onFailure(ex);
             return;
         }
 
         // Early check to verify that the user can create the destination index and can read from the source
         if (licenseState.isAuthAllowed()) {
+            final String destIndex = config.getDestination().getIndex();
+            final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState,
+                IndicesOptions.lenientExpandOpen(),
+                config.getDestination().getIndex());
             final String username = securityContext.getUser().principal();
             List<String> srcPrivileges = new ArrayList<>(2);
             srcPrivileges.add("read");

+ 3 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java

@@ -45,6 +45,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskS
 import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
+import org.elasticsearch.xpack.dataframe.transforms.SourceDestValidator;
 import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
 
 import java.io.IOException;
@@ -185,6 +186,8 @@ public class TransportStartDataFrameTransformAction extends
                     ));
                     return;
                 }
+                // Validate source and destination indices
+                SourceDestValidator.check(config, clusterService.state(), indexNameExpressionResolver);
 
                 transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion(), config.getFrequency()));
                 final String destinationIndex = config.getDestination().getIndex();

+ 92 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidator.java

@@ -0,0 +1,92 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.dataframe.transforms;
+
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.regex.Regex;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class contains more complex validations in regards to how {@link DataFrameTransformConfig#getSource()} and
+ * {@link DataFrameTransformConfig#getDestination()} relate to each other.
+ */
+public final class SourceDestValidator {
+
+    private SourceDestValidator() {}
+
+    /**
+     * Validates the DataFrameTransformConfiguration source and destination indices.
+     *
+     * A simple name validation is done on {@link DataFrameTransformConfig#getDestination()} inside
+     * {@link org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction}
+     *
+     * So, no need to do the name checks here.
+     *
+     * @param config DataFrameTransformConfig to validate
+     * @param clusterState The current ClusterState
+     * @param indexNameExpressionResolver A valid IndexNameExpressionResolver object
+     * @throws ElasticsearchStatusException when a validation fails
+     */
+    public static void check(DataFrameTransformConfig config,
+                             ClusterState clusterState,
+                             IndexNameExpressionResolver indexNameExpressionResolver) {
+
+        final String destIndex = config.getDestination().getIndex();
+        Set<String> concreteSourceIndexNames = new HashSet<>();
+        for(String src : config.getSource().getIndex()) {
+            String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src);
+            if (concreteNames.length == 0) {
+                throw new ElasticsearchStatusException(
+                    DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src),
+                    RestStatus.BAD_REQUEST);
+            }
+            if (Regex.simpleMatch(src, destIndex)) {
+                throw new ElasticsearchStatusException(
+                    DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src),
+                    RestStatus.BAD_REQUEST);
+            }
+            concreteSourceIndexNames.addAll(Arrays.asList(concreteNames));
+        }
+
+        if (concreteSourceIndexNames.contains(destIndex)) {
+            throw new ElasticsearchStatusException(
+                DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
+                    destIndex,
+                    Strings.arrayToCommaDelimitedString(config.getSource().getIndex())),
+                RestStatus.BAD_REQUEST
+            );
+        }
+
+        final String[] concreteDest =
+            indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex);
+
+        if (concreteDest.length > 1) {
+            throw new ElasticsearchStatusException(
+                DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex),
+                RestStatus.BAD_REQUEST
+            );
+        }
+        if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) {
+            throw new ElasticsearchStatusException(
+                DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
+                    concreteDest[0],
+                    Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))),
+                RestStatus.BAD_REQUEST
+            );
+        }
+    }
+}

+ 147 - 0
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidatorTests.java

@@ -0,0 +1,147 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.dataframe.transforms;
+
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfigTests;
+
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
+import static org.hamcrest.Matchers.equalTo;
+
+public class SourceDestValidatorTests extends ESTestCase {
+
+    private static final String SOURCE_1 = "source-1";
+    private static final String SOURCE_2 = "source-2";
+    private static final String ALIASED_DEST = "aliased-dest";
+
+    private static final ClusterState CLUSTER_STATE;
+
+    static {
+        IndexMetaData source1 = IndexMetaData.builder(SOURCE_1).settings(Settings.builder()
+            .put(SETTING_VERSION_CREATED, Version.CURRENT)
+            .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
+            .put(SETTING_CREATION_DATE, System.currentTimeMillis()))
+            .putAlias(AliasMetaData.builder("source-1-alias").build())
+            .build();
+        IndexMetaData source2 = IndexMetaData.builder(SOURCE_2).settings(Settings.builder()
+            .put(SETTING_VERSION_CREATED, Version.CURRENT)
+            .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
+            .put(SETTING_CREATION_DATE, System.currentTimeMillis()))
+            .putAlias(AliasMetaData.builder("dest-alias").build())
+            .build();
+        IndexMetaData aliasedDest = IndexMetaData.builder(ALIASED_DEST).settings(Settings.builder()
+            .put(SETTING_VERSION_CREATED, Version.CURRENT)
+            .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
+            .put(SETTING_CREATION_DATE, System.currentTimeMillis()))
+            .putAlias(AliasMetaData.builder("dest-alias").build())
+            .build();
+        ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
+        state.metaData(MetaData.builder()
+            .put(IndexMetaData.builder(source1).build(), false)
+            .put(IndexMetaData.builder(source2).build(), false)
+            .put(IndexMetaData.builder(aliasedDest).build(), false));
+        CLUSTER_STATE = state.build();
+    }
+
+    public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() {
+        DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest", null));
+        SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver());
+    }
+
+    public void testCheck_GivenMissingConcreteSourceIndex() {
+        DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing"), new DestConfig("dest", null));
+
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
+            () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
+        assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(e.getMessage(), equalTo("Source index [missing] does not exist"));
+    }
+
+    public void testCheck_GivenMissingWildcardSourceIndex() {
+        DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing*"), new DestConfig("dest", null));
+
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
+            () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
+        assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(e.getMessage(), equalTo("Source index [missing*] does not exist"));
+    }
+
+    public void testCheck_GivenDestIndexSameAsSourceIndex() {
+        DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1", null));
+
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
+            () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
+        assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(e.getMessage(), equalTo("Destination index [source-1] is included in source expression [source-1]"));
+    }
+
+    public void testCheck_GivenDestIndexMatchesSourceIndex() {
+        DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("source-*"), new DestConfig(SOURCE_2, null));
+
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
+            () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
+        assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]"));
+    }
+
+    public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() {
+        DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("source-1", "source-*"),
+            new DestConfig(SOURCE_2, null));
+
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
+            () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
+        assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]"));
+    }
+
+    public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() {
+        DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest-alias", null));
+
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
+            () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
+        assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(e.getMessage(),
+            equalTo("Destination index [dest-alias] should refer to a single index"));
+    }
+
+    public void testCheck_GivenDestIndexIsAliasThatIsIncludedInSource() {
+        DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1-alias", null));
+
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
+            () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
+        assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(e.getMessage(),
+            equalTo("Destination index [source-1] is included in source expression [source-1]"));
+    }
+
+    private static DataFrameTransformConfig createDataFrameTransform(SourceConfig sourceConfig, DestConfig destConfig) {
+        return new DataFrameTransformConfig("test",
+            sourceConfig,
+            destConfig,
+            TimeValue.timeValueSeconds(60),
+            null,
+            null,
+            PivotConfigTests.randomPivotConfig(),
+            null);
+    }
+}