Browse Source

[Transform] prevent old beta transforms from starting (#79712)

Disallow old beta transforms (< 7.5) from starting in >8.0. Transform can still be read and
updated/upgraded.
Hendrik Muhs 4 years ago
parent
commit
f64b18413a

+ 4 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformDeprecations.java

@@ -7,8 +7,12 @@
 
 package org.elasticsearch.xpack.core.transform;
 
+import org.elasticsearch.Version;
+
 public class TransformDeprecations {
 
+    public static final Version MIN_TRANSFORM_VERSION = Version.V_7_5_0;
+
     public static final String UPGRADE_TRANSFORM_URL = "https://ela.st/es-8-upgrade-transforms";
 
     // breaking changes base url for the _next_ major release

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java

@@ -393,7 +393,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
         List<DeprecationIssue> deprecations = new ArrayList<>();
 
         // deprecate beta transforms
-        if (getVersion() == null || getVersion().before(Version.V_7_5_0)) {
+        if (getVersion() == null || getVersion().before(TransformDeprecations.MIN_TRANSFORM_VERSION)) {
             deprecations.add(
                 new DeprecationIssue(
                     Level.CRITICAL,

+ 155 - 0
x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformOldTransformsIT.java

@@ -0,0 +1,155 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform.integration;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.common.ValidationException;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.VersionUtils;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentFactory;
+import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.transform.TransformDeprecations;
+import org.elasticsearch.xpack.core.transform.TransformField;
+import org.elasticsearch.xpack.core.transform.action.GetTransformAction;
+import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
+import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
+import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
+import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
+import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
+import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
+import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase;
+import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class TransformOldTransformsIT extends TransformSingleNodeTestCase {
+
+    private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "001";
+
+    @Override
+    protected Settings nodeSettings() {
+        // TODO Change this to run with security enabled
+        // https://github.com/elastic/elasticsearch/issues/75940
+        return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build();
+    }
+
+    /**
+     * Create an old transform and check that it can not be started, but updated and than started
+     */
+    public void testStopThrowsForDeprecatedTransformConfig() throws Exception {
+
+        // The mapping does not need to actually be the "OLD" mapping, we are testing that the old doc gets deleted, and the new one
+        // created.
+        try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
+            builder.startObject();
+            builder.field(TransformInternalIndex.DYNAMIC, "false");
+            builder.startObject("properties");
+            builder.startObject(TransformField.INDEX_DOC_TYPE.getPreferredName()).field("type", "keyword").endObject();
+            TransformInternalIndex.addTransformsConfigMappings(builder);
+            builder.endObject();
+            builder.endObject();
+            client().admin()
+                .indices()
+                .create(new CreateIndexRequest(OLD_INDEX).mapping(builder).origin(ClientHelper.TRANSFORM_ORIGIN))
+                .actionGet();
+        }
+        String transformIndex = "transform-index";
+        createSourceIndex(transformIndex);
+        String transformId = "transform-throws-for-old-config";
+        Version transformVersion = VersionUtils.randomVersionBetween(
+            random(),
+            Version.V_7_2_0,
+            VersionUtils.getPreviousVersion(TransformDeprecations.MIN_TRANSFORM_VERSION)
+        );
+        String config = "{\"dest\": {\"index\":\"bar\"},"
+            + " \"source\": {\"index\":\""
+            + transformIndex
+            + "\", \"query\": {\"match_all\":{}}},"
+            + " \"id\": \""
+            + transformId
+            + "\","
+            + " \"doc_type\": \"data_frame_transform_config\","
+            + " \"pivot\": {"
+            + "   \"group_by\": {"
+            + "     \"reviewer\": {"
+            + "       \"terms\": {"
+            + "         \"field\": \"user_id\""
+            + " } } },"
+            + "   \"aggregations\": {"
+            + "     \"avg_rating\": {"
+            + "       \"avg\": {"
+            + "         \"field\": \"stars\""
+            + " } } } },"
+            + "\"frequency\":\"1s\","
+            + "\"version\":\""
+            + transformVersion
+            + "\""
+            + "}";
+        IndexRequest indexRequest = new IndexRequest(OLD_INDEX).id(TransformConfig.documentId(transformId))
+            .source(config, XContentType.JSON)
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+        IndexResponse indexResponse = client().index(indexRequest).actionGet();
+        assertThat(indexResponse.getResult(), is(DocWriteResponse.Result.CREATED));
+
+        GetTransformAction.Request getTransformRequest = new GetTransformAction.Request(transformId);
+        GetTransformAction.Response getTransformResponse = client().execute(GetTransformAction.INSTANCE, getTransformRequest).actionGet();
+        assertThat(getTransformResponse.getTransformConfigurations().get(0).getId(), equalTo(transformId));
+        assertThat(getTransformResponse.getTransformConfigurations().get(0).getVersion(), equalTo(transformVersion));
+
+        StartTransformAction.Request startTransformRequest = new StartTransformAction.Request(
+            transformId,
+            AcknowledgedRequest.DEFAULT_ACK_TIMEOUT
+        );
+
+        ValidationException validationException = expectThrows(
+            ValidationException.class,
+            () -> client().execute(StartTransformAction.INSTANCE, startTransformRequest).actionGet()
+        );
+
+        assertThat(validationException.getMessage(), containsString("Transform configuration is too old"));
+
+        UpdateTransformAction.Request updateTransformActionRequest = new UpdateTransformAction.Request(
+            new TransformConfigUpdate(null, null, null, null, "updated", null, null, null),
+            transformId,
+            false,
+            AcknowledgedRequest.DEFAULT_ACK_TIMEOUT
+        );
+        UpdateTransformAction.Response updateTransformActionResponse = client().execute(
+            UpdateTransformAction.INSTANCE,
+            updateTransformActionRequest
+        ).actionGet();
+        assertThat(updateTransformActionResponse.getConfig().getId(), equalTo(transformId));
+        assertThat(updateTransformActionResponse.getConfig().getDescription(), equalTo("updated"));
+
+        StartTransformAction.Response startTransformActionResponse = client().execute(StartTransformAction.INSTANCE, startTransformRequest)
+            .actionGet();
+
+        assertTrue(startTransformActionResponse.isAcknowledged());
+
+        StopTransformAction.Response stopTransformActionResponse = client().execute(
+            StopTransformAction.INSTANCE,
+            new StopTransformAction.Request(transformId, true, false, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false, false)
+        ).actionGet();
+        assertTrue(stopTransformActionResponse.isAcknowledged());
+    }
+
+    private void createSourceIndex(String index) {
+        client().admin().indices().create(new CreateIndexRequest(index)).actionGet();
+    }
+}

+ 17 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.transform.action;
 
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
@@ -15,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.ValidationException;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.ingest.IngestService;
@@ -23,6 +25,7 @@ import org.elasticsearch.license.RemoteClusterLicenseChecker;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
+import org.elasticsearch.xpack.core.transform.TransformDeprecations;
 import org.elasticsearch.xpack.core.transform.TransformMessages;
 import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
 import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction.Request;
@@ -108,6 +111,20 @@ public class TransportValidateTransformAction extends HandledTransportAction<Req
         final TransformConfig config = request.getConfig();
         final Function function = FunctionFactory.create(config);
 
+        if (config.getVersion() == null || config.getVersion().before(TransformDeprecations.MIN_TRANSFORM_VERSION)) {
+            listener.onFailure(
+                new ValidationException().addValidationError(
+                    new ParameterizedMessage(
+                        "Transform configuration is too old [{}], use the upgrade API to fix your transform. "
+                            + "Minimum required version is [{}]",
+                        config.getVersion(),
+                        TransformDeprecations.MIN_TRANSFORM_VERSION
+                    ).getFormattedMessage()
+                )
+            );
+            return;
+        }
+
         // <5> Final listener
         ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(
             deducedMappings -> {

+ 16 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.transform.transforms;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceNotFoundException;
@@ -32,6 +33,7 @@ import org.elasticsearch.persistent.PersistentTasksExecutor;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
+import org.elasticsearch.xpack.core.transform.TransformDeprecations;
 import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.TransformMessages;
 import org.elasticsearch.xpack.core.transform.TransformMetadata;
@@ -270,6 +272,20 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
 
         // <3> Validate the transform, assigning it to the indexer, and get the previous stats (if they exist)
         ActionListener<TransformConfig> getTransformConfigListener = ActionListener.wrap(config -> {
+
+            // fail if a transform is too old, this can only happen on a rolling upgrade
+            if (config.getVersion() == null || config.getVersion().before(TransformDeprecations.MIN_TRANSFORM_VERSION)) {
+                String transformTooOldError = new ParameterizedMessage(
+                    "Transform configuration is too old [{}], use the upgrade API to fix your transform. "
+                        + "Minimum required version is [{}]",
+                    config.getVersion(),
+                    TransformDeprecations.MIN_TRANSFORM_VERSION
+                ).getFormattedMessage();
+                auditor.error(transformId, transformTooOldError);
+                markAsFailed(buildTask, transformTooOldError);
+                return;
+            }
+
             ValidationException validationException = config.validate(null);
             if (validationException == null) {
                 indexerBuilder.setTransformConfig(config);